use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::documents::Document;
use crate::error::Result;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpsertResponse {
pub succeeded: Vec<String>,
pub failed: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteResponse {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub num_deleted: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub succeeded: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub failed: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
pub group_id: Option<String>,
pub updated_at: f64,
}
#[async_trait]
pub trait RecordManager: Send + Sync {
async fn create_schema(&self) -> Result<()>;
async fn get_time(&self) -> Result<f64>;
async fn update(
&self,
keys: &[String],
group_ids: &[Option<String>],
time_at_least: Option<f64>,
) -> Result<()>;
async fn exists(&self, keys: &[String]) -> Result<Vec<bool>>;
async fn list_keys(
&self,
before: Option<f64>,
after: Option<f64>,
group_ids: Option<&[String]>,
limit: Option<usize>,
) -> Result<Vec<String>>;
async fn delete_keys(&self, keys: &[String]) -> Result<()>;
}
pub struct InMemoryRecordManager {
namespace: String,
records: Mutex<HashMap<String, Record>>,
}
impl InMemoryRecordManager {
pub fn new(namespace: impl Into<String>) -> Self {
Self {
namespace: namespace.into(),
records: Mutex::new(HashMap::new()),
}
}
pub fn namespace(&self) -> &str {
&self.namespace
}
fn current_time() -> f64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
}
#[async_trait]
impl RecordManager for InMemoryRecordManager {
async fn create_schema(&self) -> Result<()> {
Ok(())
}
async fn get_time(&self) -> Result<f64> {
Ok(Self::current_time())
}
async fn update(
&self,
keys: &[String],
group_ids: &[Option<String>],
time_at_least: Option<f64>,
) -> Result<()> {
let mut records = self.records.lock().unwrap();
let now = Self::current_time();
let timestamp = match time_at_least {
Some(t) if t > now => t,
_ => now,
};
for (key, group_id) in keys.iter().zip(group_ids.iter()) {
records.insert(
key.clone(),
Record {
group_id: group_id.clone(),
updated_at: timestamp,
},
);
}
Ok(())
}
async fn exists(&self, keys: &[String]) -> Result<Vec<bool>> {
let records = self.records.lock().unwrap();
Ok(keys.iter().map(|k| records.contains_key(k)).collect())
}
async fn list_keys(
&self,
before: Option<f64>,
after: Option<f64>,
group_ids: Option<&[String]>,
limit: Option<usize>,
) -> Result<Vec<String>> {
let records = self.records.lock().unwrap();
let mut keys: Vec<String> = records
.iter()
.filter(|(_, record)| {
if let Some(b) = before {
if record.updated_at >= b {
return false;
}
}
if let Some(a) = after {
if record.updated_at <= a {
return false;
}
}
if let Some(gids) = group_ids {
match &record.group_id {
Some(gid) => {
if !gids.contains(gid) {
return false;
}
}
None => return false,
}
}
true
})
.map(|(k, _)| k.clone())
.collect();
keys.sort();
if let Some(lim) = limit {
keys.truncate(lim);
}
Ok(keys)
}
async fn delete_keys(&self, keys: &[String]) -> Result<()> {
let mut records = self.records.lock().unwrap();
for key in keys {
records.remove(key);
}
Ok(())
}
}
#[async_trait]
pub trait DocumentIndex: Send + Sync {
async fn upsert(&self, docs: Vec<Document>) -> Result<UpsertResponse>;
async fn delete(&self, ids: &[String]) -> Result<DeleteResponse>;
async fn get(&self, ids: &[String]) -> Result<Vec<Document>>;
}