use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
#[cfg(any(feature = "sqlite", feature = "postgres"))]
use std::fmt::Write;
#[cfg(any(feature = "sqlite", feature = "postgres"))]
use sqlx::Row;
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("item not found: {namespace}/{key}")]
NotFound {
namespace: String,
key: String,
},
#[error("invalid namespace: {0}")]
InvalidNamespace(String),
#[error("serialization error: {0}")]
Serialize(#[from] serde_json::Error),
#[error("storage error: {0}")]
Storage(String),
#[error("vector search error: {0}")]
VectorSearch(String),
#[error("embedding error: {0}")]
Embedding(String),
}
#[async_trait]
pub trait Store: Send + Sync + 'static {
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError>;
async fn put(
&self,
namespace: &str,
key: &str,
value: serde_json::Value,
index: Option<Vec<String>>,
) -> Result<(), StoreError>;
async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError>;
async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError>;
async fn list_namespaces(
&self,
prefix: Option<&str>,
suffix: Option<&str>,
max_depth: Option<usize>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<String>, StoreError>;
async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Item {
pub namespace: String,
pub key: String,
pub value: serde_json::Value,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub embedding: Option<Vec<f32>>,
}
impl Item {
#[must_use]
pub fn is_expired(&self) -> bool {
self.expires_at
.is_some_and(|expires_at| Utc::now() > expires_at)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchItem {
#[serde(flatten)]
pub item: Item,
pub score: Option<f64>,
}
#[derive(Debug, Clone, Default)]
pub struct SearchQuery {
pub namespace_prefix: String,
pub filter: Option<FilterExpr>,
pub query: Option<String>,
pub limit: usize,
pub offset: usize,
}
#[derive(Debug, Clone)]
pub struct SearchResult {
pub items: Vec<SearchItem>,
pub total_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "op")]
pub enum FilterExpr {
#[serde(rename = "$eq")]
Eq {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$ne")]
Ne {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$gt")]
Gt {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$gte")]
Gte {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$lt")]
Lt {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$lte")]
Lte {
field: String,
value: serde_json::Value,
},
#[serde(rename = "$and")]
And {
expressions: Vec<FilterExpr>,
},
#[serde(rename = "$or")]
Or {
expressions: Vec<FilterExpr>,
},
#[serde(rename = "$not")]
Not {
expr: Box<FilterExpr>,
},
}
impl FilterExpr {
#[must_use]
pub fn matches(&self, value: &serde_json::Value) -> bool {
evaluate_filter(self, value)
}
}
#[derive(Debug, Clone)]
pub enum StoreOp {
Get {
namespace: String,
key: String,
},
Put {
namespace: String,
key: String,
value: serde_json::Value,
index: Option<Vec<String>>,
},
Delete {
namespace: String,
key: String,
},
Search(SearchQuery),
ListNamespaces {
prefix: Option<String>,
suffix: Option<String>,
max_depth: Option<usize>,
limit: Option<usize>,
},
}
#[derive(Debug, Clone)]
pub enum StoreResult {
Item(Option<Item>),
Items(SearchResult),
Namespaces(Vec<String>),
None,
}
#[derive(Clone, Debug)]
pub struct TTLConfig {
pub default_ttl: Option<std::time::Duration>,
pub refresh_on_read: bool,
pub sweep_interval: std::time::Duration,
pub sweep_max_items: usize,
}
impl Default for TTLConfig {
fn default() -> Self {
Self {
default_ttl: None,
refresh_on_read: false,
sweep_interval: std::time::Duration::from_secs(300),
sweep_max_items: 1000,
}
}
}
#[derive(Debug)]
pub struct MemoryStore {
data: Arc<tokio::sync::RwLock<HashMap<String, HashMap<String, Item>>>>,
index_config: Option<IndexConfig>,
ttl_config: TTLConfig,
}
#[async_trait::async_trait]
pub trait EmbeddingFunc: Send + Sync + 'static {
async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError>;
}
pub struct IndexConfig {
pub dims: usize,
pub embed: Box<dyn EmbeddingFunc>,
pub fields: Option<Vec<String>>,
}
impl std::fmt::Debug for IndexConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IndexConfig")
.field("dims", &self.dims)
.field("embed", &"...")
.field("fields", &self.fields)
.finish()
}
}
impl Default for MemoryStore {
fn default() -> Self {
Self::new()
}
}
impl MemoryStore {
#[must_use]
pub fn new() -> Self {
Self {
data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
index_config: None,
ttl_config: TTLConfig::default(),
}
}
#[must_use]
pub fn with_vector_search(mut self, config: IndexConfig) -> Self {
self.index_config = Some(config);
self
}
#[must_use]
pub const fn with_ttl_config(mut self, config: TTLConfig) -> Self {
self.ttl_config = config;
self
}
#[allow(
clippy::significant_drop_tightening,
reason = "Write lock must be held during iteration and removal"
)]
pub async fn sweep_expired_items(&self) -> Result<usize, StoreError> {
let now = Utc::now();
let mut count = 0;
let mut items = self.data.write().await;
let mut keys_to_remove = Vec::new();
for (namespace, namespace_map) in items.iter() {
for (key, item) in namespace_map {
if let Some(expires_at) = item.expires_at
&& expires_at < now
{
keys_to_remove.push((namespace.clone(), key.clone()));
count += 1;
if count >= self.ttl_config.sweep_max_items {
break;
}
}
}
if count >= self.ttl_config.sweep_max_items {
break;
}
}
for (namespace, key) in keys_to_remove {
if let Some(namespace_map) = items.get_mut(&namespace) {
namespace_map.remove(&key);
}
}
Ok(count)
}
#[must_use]
pub fn start_sweep_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.ttl_config.sweep_interval);
loop {
interval.tick().await;
if let Err(e) = self.sweep_expired_items().await {
tracing::warn!("Store sweep failed: {}", e);
}
}
})
}
}
#[async_trait]
impl Store for MemoryStore {
#[allow(
clippy::significant_drop_tightening,
reason = "Read lock is scoped tightly; write lock acquired after release"
)]
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
let is_expired = {
let data = self.data.read().await;
let Some(ns) = data.get(namespace) else {
return Ok(None);
};
let Some(item) = ns.get(key) else {
return Ok(None);
};
item.is_expired()
};
if is_expired {
let mut data = self.data.write().await;
if let Some(ns_map) = data.get_mut(namespace) {
ns_map.remove(key);
}
drop(data);
return Ok(None);
}
if self.ttl_config.refresh_on_read && self.ttl_config.default_ttl.is_some() {
let ttl = self.ttl_config.default_ttl.expect("checked is_some above");
let now = Utc::now();
let new_expires =
now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
let mut data = self.data.write().await;
if let Some(ns_map) = data.get_mut(namespace)
&& let Some(item) = ns_map.get_mut(key)
{
item.expires_at = Some(new_expires);
item.updated_at = now;
let cloned = item.clone();
drop(data);
return Ok(Some(cloned));
}
drop(data);
return Ok(None);
}
let data = self.data.read().await;
let item = data.get(namespace).and_then(|ns| ns.get(key).cloned());
Ok(item)
}
#[allow(
clippy::significant_drop_tightening,
reason = "Lock must be held for entire put operation after embedding"
)]
async fn put(
&self,
namespace: &str,
key: &str,
value: serde_json::Value,
index: Option<Vec<String>>,
) -> Result<(), StoreError> {
let embedding = if let Some(ref index_config) = self.index_config {
if let Some(index_fields) = &index {
if index_fields.is_empty() {
None
} else {
let text = extract_index_text(&value, index_fields);
if text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![text]).await?;
embeddings.pop()
}
}
} else {
None
}
} else {
None
};
let now = Utc::now();
let expires_at = self
.ttl_config
.default_ttl
.map(|ttl| now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX));
let mut data = self.data.write().await;
let namespace_map = data
.entry(namespace.to_string())
.or_insert_with(HashMap::new);
let existing = namespace_map.get(key);
let item = Item {
namespace: namespace.to_string(),
key: key.to_string(),
value,
created_at: existing.map_or(now, |i| i.created_at),
updated_at: now,
expires_at,
embedding,
};
namespace_map.insert(key.to_string(), item);
Ok(())
}
#[allow(
clippy::significant_drop_tightening,
reason = "Lock must be held for entire delete operation"
)]
async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
let mut data = self.data.write().await;
if let Some(namespace_map) = data.get_mut(namespace) {
namespace_map.remove(key);
}
Ok(())
}
#[allow(
clippy::significant_drop_tightening,
reason = "Lock must be held for entire search iteration"
)]
async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
if let Some(query_text) = &query.query {
if query_text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
embeddings.pop()
}
} else {
None
}
} else {
None
};
let mut items: Vec<SearchItem> = {
let data = self.data.read().await;
let mut results = Vec::new();
for (namespace, namespace_map) in data.iter() {
if namespace.starts_with(&query.namespace_prefix) {
for item in namespace_map.values() {
if item.is_expired() {
continue;
}
if query
.filter
.as_ref()
.is_some_and(|filter| !evaluate_filter(filter, &item.value))
{
continue;
}
let score = query_embedding.as_ref().and_then(|q_emb| {
item.embedding
.as_ref()
.map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
});
results.push(SearchItem {
item: item.clone(),
score,
});
}
}
}
results
};
let total = items.len();
if query_embedding.is_some() {
items.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
let start = query.offset.min(items.len());
let end = (start + query.limit).min(items.len());
let page = items.drain(start..end).collect();
Ok(SearchResult {
items: page,
total_count: total,
})
}
async fn list_namespaces(
&self,
prefix: Option<&str>,
suffix: Option<&str>,
max_depth: Option<usize>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<String>, StoreError> {
let mut namespaces: Vec<String> = {
let data = self.data.read().await;
data.keys().cloned().collect()
};
if let Some(prefix_filter) = prefix {
namespaces.retain(|ns| ns.starts_with(prefix_filter));
}
if let Some(suffix_filter) = suffix {
namespaces.retain(|ns| ns.ends_with(suffix_filter));
}
if let Some(depth) = max_depth {
namespaces = namespaces
.into_iter()
.map(|ns| {
let parts: Vec<&str> = ns.split('/').take(depth).collect();
parts.join("/")
})
.collect();
namespaces.sort();
namespaces.dedup();
}
if let Some(offset_value) = offset {
let skip = offset_value.min(namespaces.len());
namespaces.drain(..skip);
}
if let Some(limit_value) = limit {
namespaces.truncate(limit_value);
}
Ok(namespaces)
}
async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
let mut results = Vec::with_capacity(ops.len());
for op in ops {
let result = match op {
StoreOp::Get { namespace, key } => {
let item = self.get(&namespace, &key).await?;
StoreResult::Item(item)
}
StoreOp::Put {
namespace,
key,
value,
index,
} => {
self.put(&namespace, &key, value, index).await?;
StoreResult::None
}
StoreOp::Delete { namespace, key } => {
self.delete(&namespace, &key).await?;
StoreResult::None
}
StoreOp::Search(query) => {
let result = self.search(query).await?;
StoreResult::Items(result)
}
StoreOp::ListNamespaces {
prefix,
suffix,
max_depth,
limit,
} => {
let namespaces = self
.list_namespaces(
prefix.as_deref(),
suffix.as_deref(),
max_depth,
limit,
None,
)
.await?;
StoreResult::Namespaces(namespaces)
}
};
results.push(result);
}
Ok(results)
}
}
fn evaluate_filter(filter: &FilterExpr, value: &serde_json::Value) -> bool {
match filter {
FilterExpr::Eq {
field,
value: expected,
} => get_field(value, field).is_some_and(|v| v == *expected),
FilterExpr::Ne {
field,
value: expected,
} => get_field(value, field).is_none_or(|v| v != *expected),
FilterExpr::Gt {
field,
value: expected,
} => compare_numbers(value, field, expected, |a, b| a > b),
FilterExpr::Gte {
field,
value: expected,
} => compare_numbers(value, field, expected, |a, b| a >= b),
FilterExpr::Lt {
field,
value: expected,
} => compare_numbers(value, field, expected, |a, b| a < b),
FilterExpr::Lte {
field,
value: expected,
} => compare_numbers(value, field, expected, |a, b| a <= b),
FilterExpr::And { expressions } => {
expressions.iter().all(|expr| evaluate_filter(expr, value))
}
FilterExpr::Or { expressions } => {
expressions.iter().any(|expr| evaluate_filter(expr, value))
}
FilterExpr::Not { expr } => !evaluate_filter(expr, value),
}
}
fn get_field(value: &serde_json::Value, path: &str) -> Option<serde_json::Value> {
let parts: Vec<&str> = path.split('.').collect();
let mut current = value;
for part in parts {
match current {
serde_json::Value::Object(map) => {
current = map.get(part)?;
}
_ => return None,
}
}
Some(current.clone())
}
fn compare_numbers(
value: &serde_json::Value,
field: &str,
expected: &serde_json::Value,
comparator: impl Fn(f64, f64) -> bool,
) -> bool {
match (get_field(value, field), expected) {
(Some(serde_json::Value::Number(a)), serde_json::Value::Number(b)) => {
match (a.as_f64(), b.as_f64()) {
(Some(a_val), Some(b_val)) => comparator(a_val, b_val),
_ => false,
}
}
_ => false,
}
}
#[must_use]
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
let len = a.len().min(b.len());
if len == 0 {
return 0.0;
}
let a = &a[..len];
let b = &b[..len];
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b = b.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm_a == 0.0 || norm_b == 0.0 {
return 0.0;
}
dot_product / (norm_a * norm_b)
}
fn extract_index_text(value: &serde_json::Value, fields: &[String]) -> String {
fields
.iter()
.filter_map(|field| {
get_field(value, field).map(|v| {
v.as_str()
.map_or_else(|| v.to_string(), ToString::to_string)
})
})
.collect::<Vec<_>>()
.join(" ")
}
#[cfg(feature = "sqlite")]
#[derive(Debug)]
pub struct SqliteStore {
pool: Option<sqlx::SqlitePool>,
index_config: Option<IndexConfig>,
}
#[cfg(feature = "sqlite")]
impl SqliteStore {
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
let pool = sqlx::SqlitePool::connect(database_url)
.await
.map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS store_items (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
PRIMARY KEY (namespace, key)
)
",
)
.execute(&pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS store_vectors (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
field TEXT NOT NULL,
vector BLOB NOT NULL,
PRIMARY KEY (namespace, key, field),
FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
)
",
)
.execute(&pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
Ok(Self {
pool: Some(pool),
index_config: None,
})
}
pub async fn with_vector_search(
database_url: &str,
config: IndexConfig,
) -> Result<Self, StoreError> {
let mut store = Self::new(database_url).await?;
store.index_config = Some(config);
Ok(store)
}
}
#[cfg(feature = "sqlite")]
fn filter_to_sql_sqlite(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
match filter {
FilterExpr::Eq { field, value } => (
format!("json_extract(value, '$.{field}') = ?"),
vec![value.clone()],
),
FilterExpr::Ne { field, value } => (
format!("json_extract(value, '$.{field}') != ?"),
vec![value.clone()],
),
FilterExpr::Gt { field, value } => (
format!("CAST(json_extract(value, '$.{field}') AS REAL) > CAST(? AS REAL)"),
vec![value.clone()],
),
FilterExpr::Gte { field, value } => (
format!("CAST(json_extract(value, '$.{field}') AS REAL) >= CAST(? AS REAL)"),
vec![value.clone()],
),
FilterExpr::Lt { field, value } => (
format!("CAST(json_extract(value, '$.{field}') AS REAL) < CAST(? AS REAL)"),
vec![value.clone()],
),
FilterExpr::Lte { field, value } => (
format!("CAST(json_extract(value, '$.{field}') AS REAL) <= CAST(? AS REAL)"),
vec![value.clone()],
),
FilterExpr::And { expressions } => {
let mut clauses = Vec::with_capacity(expressions.len());
let mut all_params = Vec::new();
for expr in expressions {
let (clause, params) = filter_to_sql_sqlite(expr);
clauses.push(format!("({clause})"));
all_params.extend(params);
}
(clauses.join(" AND "), all_params)
}
FilterExpr::Or { expressions } => {
let mut clauses = Vec::with_capacity(expressions.len());
let mut all_params = Vec::new();
for expr in expressions {
let (clause, params) = filter_to_sql_sqlite(expr);
clauses.push(format!("({clause})"));
all_params.extend(params);
}
(clauses.join(" OR "), all_params)
}
FilterExpr::Not { expr } => {
let (clause, params) = filter_to_sql_sqlite(expr);
(format!("NOT ({clause})"), params)
}
}
}
#[cfg(feature = "sqlite")]
fn sqlite_param_from_value(value: &serde_json::Value) -> String {
match value {
serde_json::Value::Bool(true) => "1".to_string(),
serde_json::Value::Bool(false) => "0".to_string(),
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
}
}
#[cfg(feature = "sqlite")]
#[async_trait]
impl Store for SqliteStore {
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let result = sqlx::query(
"SELECT value, created_at, updated_at FROM store_items WHERE namespace = ? AND key = ?",
)
.bind(namespace)
.bind(key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
if let Some(row) = result {
let value_str: String = row
.try_get("value")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
let created_at: String = row
.try_get("created_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let updated_at: String = row
.try_get("updated_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let embedding = if self.index_config.is_some() {
let vector_row = sqlx::query(
"SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
)
.bind(namespace)
.bind(key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
if let Some(vrow) = vector_row {
let bytes: Vec<u8> = vrow
.try_get("vector")
.map_err(|e| StoreError::Storage(e.to_string()))?;
Some(blob_to_vector(&bytes)?)
} else {
None
}
} else {
None
};
Ok(Some(Item {
namespace: namespace.to_string(),
key: key.to_string(),
value,
created_at: chrono::DateTime::parse_from_rfc3339(&created_at)
.map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
.with_timezone(&chrono::Utc),
updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at)
.map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
.with_timezone(&chrono::Utc),
expires_at: None,
embedding,
}))
} else {
Ok(None)
}
}
async fn put(
&self,
namespace: &str,
key: &str,
value: serde_json::Value,
index: Option<Vec<String>>,
) -> Result<(), StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let now = Utc::now();
let embedding = if let Some(ref index_config) = self.index_config {
if let Some(index_fields) = &index {
if index_fields.is_empty() {
None
} else {
let text = extract_index_text(&value, index_fields);
if text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![text]).await?;
embeddings.pop()
}
}
} else {
None
}
} else {
None
};
let value_str = serde_json::to_string(&value).map_err(StoreError::Serialize)?;
let now_str = now.to_rfc3339();
sqlx::query(
r"
INSERT INTO store_items (namespace, key, value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (namespace, key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
",
)
.bind(namespace)
.bind(key)
.bind(&value_str)
.bind(&now_str)
.bind(&now_str)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
if let Some(vec) = embedding {
let bytes = vector_to_blob(&vec);
sqlx::query(
r"
INSERT INTO store_vectors (namespace, key, field, vector)
VALUES (?, ?, 'default', ?)
ON CONFLICT (namespace, key, field) DO UPDATE SET
vector = excluded.vector
",
)
.bind(namespace)
.bind(key)
.bind(&bytes)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
}
Ok(())
}
async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
sqlx::query("DELETE FROM store_items WHERE namespace = ? AND key = ?")
.bind(namespace)
.bind(key)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
Ok(())
}
#[allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
clippy::as_conversions,
clippy::similar_names,
clippy::too_many_lines,
reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
)]
async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
if let Some(query_text) = &query.query {
if query_text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
embeddings.pop()
}
} else {
None
}
} else {
None
};
let namespace_pattern = format!("{}%", query.namespace_prefix);
let mut conditions = vec!["namespace LIKE ?".to_string()];
let mut params_str: Vec<String> = vec![namespace_pattern];
if let Some(ref filter) = query.filter {
let (clause, filter_params) = filter_to_sql_sqlite(filter);
conditions.push(format!("({clause})"));
for p in &filter_params {
params_str.push(sqlite_param_from_value(p));
}
}
let where_clause = conditions.join(" AND ");
let data_sql = format!(
"SELECT namespace, key, value, created_at, updated_at \
FROM store_items WHERE {where_clause} \
ORDER BY namespace, key"
);
let mut data_query = sqlx::query(&data_sql);
for p in ¶ms_str {
data_query = data_query.bind(p.as_str());
}
let rows = data_query
.fetch_all(pool)
.await
.map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
for row in rows {
let namespace: String = row
.try_get("namespace")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let key: String = row
.try_get("key")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let value_str: String = row
.try_get("value")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
let created_at_str: String = row
.try_get("created_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let updated_at_str: String = row
.try_get("updated_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let embedding = if query_embedding.is_some() {
let vector_row = sqlx::query(
"SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
)
.bind(&namespace)
.bind(&key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
if let Some(vrow) = vector_row {
let bytes: Vec<u8> = vrow
.try_get("vector")
.map_err(|e| StoreError::Storage(e.to_string()))?;
Some(blob_to_vector(&bytes)?)
} else {
None
}
} else {
None
};
let score = query_embedding.as_ref().and_then(|q_emb| {
embedding
.as_ref()
.map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
});
items.push(SearchItem {
item: Item {
namespace,
key,
value,
created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
.map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
.with_timezone(&chrono::Utc),
updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
.map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
.with_timezone(&chrono::Utc),
expires_at: None,
embedding,
},
score,
});
}
let total_count = items.len();
if query_embedding.is_some() {
items.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
let start = query.offset.min(items.len());
let end = (start + query.limit).min(items.len());
let page = items.drain(start..end).collect();
Ok(SearchResult {
items: page,
total_count,
})
}
async fn list_namespaces(
&self,
prefix: Option<&str>,
suffix: Option<&str>,
max_depth: Option<usize>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<String>, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
let mut params = Vec::new();
if let Some(prefix_filter) = prefix {
query_str.push_str(" AND namespace LIKE ?");
params.push(format!("{prefix_filter}%"));
}
if let Some(suffix_filter) = suffix {
query_str.push_str(" AND namespace LIKE ?");
params.push(format!("%{suffix_filter}"));
}
if let Some(limit_value) = limit {
let _ = write!(query_str, " LIMIT {limit_value}");
}
if let Some(offset_value) = offset {
let _ = write!(query_str, " OFFSET {offset_value}");
}
let mut query = sqlx::query(&query_str);
for param in params {
query = query.bind(param);
}
let rows = query
.fetch_all(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
let mut namespaces = Vec::new();
for row in rows {
let ns: String = row
.try_get("namespace")
.map_err(|e| StoreError::Storage(e.to_string()))?;
namespaces.push(ns);
}
if let Some(depth) = max_depth {
namespaces = namespaces
.into_iter()
.map(|ns| {
let parts: Vec<&str> = ns.split('/').take(depth).collect();
parts.join("/")
})
.collect();
namespaces.sort();
namespaces.dedup();
}
Ok(namespaces)
}
async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
let mut results = Vec::with_capacity(ops.len());
for op in ops {
let result = match op {
StoreOp::Get { namespace, key } => {
let item = self.get(&namespace, &key).await?;
StoreResult::Item(item)
}
StoreOp::Put {
namespace,
key,
value,
index,
} => {
self.put(&namespace, &key, value, index).await?;
StoreResult::None
}
StoreOp::Delete { namespace, key } => {
self.delete(&namespace, &key).await?;
StoreResult::None
}
StoreOp::Search(query) => {
let result = self.search(query).await?;
StoreResult::Items(result)
}
StoreOp::ListNamespaces {
prefix,
suffix,
max_depth,
limit,
} => {
let namespaces = self
.list_namespaces(
prefix.as_deref(),
suffix.as_deref(),
max_depth,
limit,
None,
)
.await?;
StoreResult::Namespaces(namespaces)
}
};
results.push(result);
}
Ok(results)
}
}
#[cfg(feature = "postgres")]
#[derive(Debug)]
pub struct PostgresStore {
pool: Option<sqlx::PgPool>,
index_config: Option<IndexConfig>,
}
#[cfg(feature = "postgres")]
impl PostgresStore {
pub async fn new(database_url: &str) -> Result<Self, StoreError> {
let pool = sqlx::PgPool::connect(database_url)
.await
.map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS store_items (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
value JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (namespace, key)
)
",
)
.execute(&pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
sqlx::query(
r"
CREATE TABLE IF NOT EXISTS store_vectors (
namespace TEXT NOT NULL,
key TEXT NOT NULL,
field TEXT NOT NULL,
vector BYTEA NOT NULL,
PRIMARY KEY (namespace, key, field),
FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
)
",
)
.execute(&pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
Ok(Self {
pool: Some(pool),
index_config: None,
})
}
pub async fn with_vector_search(
database_url: &str,
config: IndexConfig,
) -> Result<Self, StoreError> {
let mut store = Self::new(database_url).await?;
store.index_config = Some(config);
Ok(store)
}
}
#[cfg(feature = "postgres")]
fn vector_to_bytea(vec: &[f32]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
for &val in vec {
bytes.extend_from_slice(&val.to_le_bytes());
}
bytes
}
#[cfg(feature = "postgres")]
fn bytea_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return Err(StoreError::VectorSearch(
"Invalid BYTEA length for vector data".to_string(),
));
}
let vec = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| {
let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
f32::from_le_bytes(arr)
})
.collect();
Ok(vec)
}
#[cfg(feature = "sqlite")]
fn vector_to_blob(vec: &[f32]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
for &val in vec {
bytes.extend_from_slice(&val.to_le_bytes());
}
bytes
}
#[cfg(feature = "sqlite")]
fn blob_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
if bytes.len() % std::mem::size_of::<f32>() != 0 {
return Err(StoreError::VectorSearch(
"Invalid BLOB length for vector data".to_string(),
));
}
let vec = bytes
.chunks_exact(std::mem::size_of::<f32>())
.map(|chunk| {
let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
f32::from_le_bytes(arr)
})
.collect();
Ok(vec)
}
#[cfg(feature = "postgres")]
fn filter_to_sql_postgres(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
match filter {
FilterExpr::Eq { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(format!("value #>> '{{{path}}}' = ?"), vec![value.clone()])
}
FilterExpr::Ne { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(format!("value #>> '{{{path}}}' != ?"), vec![value.clone()])
}
FilterExpr::Gt { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(
format!("(value #> '{{{path}}}')::numeric > CAST(? AS numeric)"),
vec![value.clone()],
)
}
FilterExpr::Gte { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(
format!("(value #> '{{{path}}}')::numeric >= CAST(? AS numeric)"),
vec![value.clone()],
)
}
FilterExpr::Lt { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(
format!("(value #> '{{{path}}}')::numeric < CAST(? AS numeric)"),
vec![value.clone()],
)
}
FilterExpr::Lte { field, value } => {
let path = field.split('.').collect::<Vec<_>>().join(",");
(
format!("(value #> '{{{path}}}')::numeric <= CAST(? AS numeric)"),
vec![value.clone()],
)
}
FilterExpr::And { expressions } => {
let mut clauses = Vec::with_capacity(expressions.len());
let mut all_params = Vec::new();
for expr in expressions {
let (clause, params) = filter_to_sql_postgres(expr);
clauses.push(format!("({clause})"));
all_params.extend(params);
}
(clauses.join(" AND "), all_params)
}
FilterExpr::Or { expressions } => {
let mut clauses = Vec::with_capacity(expressions.len());
let mut all_params = Vec::new();
for expr in expressions {
let (clause, params) = filter_to_sql_postgres(expr);
clauses.push(format!("({clause})"));
all_params.extend(params);
}
(clauses.join(" OR "), all_params)
}
FilterExpr::Not { expr } => {
let (clause, params) = filter_to_sql_postgres(expr);
(format!("NOT ({clause})"), params)
}
}
}
#[cfg(feature = "postgres")]
#[async_trait]
impl Store for PostgresStore {
async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let result = sqlx::query(
"SELECT value, created_at, updated_at FROM store_items WHERE namespace = $1 AND key = $2"
)
.bind(namespace)
.bind(key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
if let Some(row) = result {
let value: serde_json::Value = row
.try_get("value")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let created_at: chrono::DateTime<chrono::Utc> = row
.try_get("created_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let updated_at: chrono::DateTime<chrono::Utc> = row
.try_get("updated_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let embedding = if self.index_config.is_some() {
let vector_row = sqlx::query(
"SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
)
.bind(namespace)
.bind(key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
if let Some(vrow) = vector_row {
let bytes: Vec<u8> = vrow
.try_get("vector")
.map_err(|e| StoreError::Storage(e.to_string()))?;
Some(bytea_to_vector(&bytes)?)
} else {
None
}
} else {
None
};
Ok(Some(Item {
namespace: namespace.to_string(),
key: key.to_string(),
value,
created_at,
updated_at,
expires_at: None,
embedding,
}))
} else {
Ok(None)
}
}
async fn put(
&self,
namespace: &str,
key: &str,
value: serde_json::Value,
index: Option<Vec<String>>,
) -> Result<(), StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let now = Utc::now();
let embedding = if let Some(ref index_config) = self.index_config {
if let Some(index_fields) = &index {
if index_fields.is_empty() {
None
} else {
let text = extract_index_text(&value, index_fields);
if text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![text]).await?;
embeddings.pop()
}
}
} else {
None
}
} else {
None
};
sqlx::query(
r"
INSERT INTO store_items (namespace, key, value, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (namespace, key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at
",
)
.bind(namespace)
.bind(key)
.bind(&value)
.bind(now)
.bind(now)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
if let Some(vec) = embedding {
let bytes = vector_to_bytea(&vec);
sqlx::query(
r"
INSERT INTO store_vectors (namespace, key, field, vector)
VALUES ($1, $2, 'default', $3)
ON CONFLICT (namespace, key, field) DO UPDATE SET
vector = EXCLUDED.vector
",
)
.bind(namespace)
.bind(key)
.bind(&bytes)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
}
Ok(())
}
async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
sqlx::query("DELETE FROM store_items WHERE namespace = $1 AND key = $2")
.bind(namespace)
.bind(key)
.execute(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
Ok(())
}
#[allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
clippy::as_conversions,
clippy::similar_names,
clippy::too_many_lines,
reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
)]
async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
if let Some(query_text) = &query.query {
if query_text.is_empty() {
None
} else {
let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
embeddings.pop()
}
} else {
None
}
} else {
None
};
let namespace_pattern = format!("{}%", query.namespace_prefix);
let mut conditions = vec!["namespace LIKE $1".to_string()];
let mut bind_params: Vec<String> = vec![namespace_pattern];
let mut param_idx = 2;
if let Some(ref filter) = query.filter {
let (clause, filter_params) = filter_to_sql_postgres(filter);
let mut numbered_clause = String::with_capacity(clause.len());
for c in clause.chars() {
if c == '?' {
let _ = write!(numbered_clause, "${param_idx}");
param_idx += 1;
} else {
numbered_clause.push(c);
}
}
conditions.push(format!("({numbered_clause})"));
for p in &filter_params {
bind_params.push(p.to_string());
}
}
let where_clause = conditions.join(" AND ");
let data_sql = format!(
"SELECT namespace, key, value, created_at, updated_at \
FROM store_items WHERE {where_clause} \
ORDER BY namespace, key"
);
let mut data_query = sqlx::query(&data_sql);
for p in &bind_params {
data_query = data_query.bind(p.as_str());
}
let rows = data_query
.fetch_all(pool)
.await
.map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
for row in rows {
let namespace: String = row
.try_get("namespace")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let key: String = row
.try_get("key")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let value: serde_json::Value = row
.try_get("value")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let created_at: chrono::DateTime<chrono::Utc> = row
.try_get("created_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let updated_at: chrono::DateTime<chrono::Utc> = row
.try_get("updated_at")
.map_err(|e| StoreError::Storage(e.to_string()))?;
let embedding = if query_embedding.is_some() {
let vector_row = sqlx::query(
"SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
)
.bind(&namespace)
.bind(&key)
.fetch_optional(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
if let Some(vrow) = vector_row {
let bytes: Vec<u8> = vrow
.try_get("vector")
.map_err(|e| StoreError::Storage(e.to_string()))?;
Some(bytea_to_vector(&bytes)?)
} else {
None
}
} else {
None
};
let score = query_embedding.as_ref().and_then(|q_emb| {
embedding
.as_ref()
.map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
});
items.push(SearchItem {
item: Item {
namespace,
key,
value,
created_at,
updated_at,
expires_at: None,
embedding,
},
score,
});
}
let total_count = items.len();
if query_embedding.is_some() {
items.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
let start = query.offset.min(items.len());
let end = (start + query.limit).min(items.len());
let page = items.drain(start..end).collect();
Ok(SearchResult {
items: page,
total_count,
})
}
async fn list_namespaces(
&self,
prefix: Option<&str>,
suffix: Option<&str>,
max_depth: Option<usize>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<String>, StoreError> {
let pool = self
.pool
.as_ref()
.ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
let mut param_idx = 1;
let mut params = Vec::new();
if let Some(prefix_filter) = prefix {
param_idx += 1;
let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
params.push(format!("{prefix_filter}%"));
}
if let Some(suffix_filter) = suffix {
param_idx += 1;
let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
params.push(format!("%{suffix_filter}"));
}
if let Some(limit_value) = limit {
let _ = write!(query_str, " LIMIT {limit_value}");
}
if let Some(offset_value) = offset {
let _ = write!(query_str, " OFFSET {offset_value}");
}
let mut query = sqlx::query(&query_str);
for param in params {
query = query.bind(param);
}
let rows = query
.fetch_all(pool)
.await
.map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
let mut namespaces = Vec::new();
for row in rows {
let ns: String = row
.try_get("namespace")
.map_err(|e| StoreError::Storage(e.to_string()))?;
namespaces.push(ns);
}
if let Some(depth) = max_depth {
namespaces = namespaces
.into_iter()
.map(|ns| {
let parts: Vec<&str> = ns.split('/').take(depth).collect();
parts.join("/")
})
.collect();
namespaces.sort();
namespaces.dedup();
}
Ok(namespaces)
}
async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
let mut results = Vec::with_capacity(ops.len());
for op in ops {
let result = match op {
StoreOp::Get { namespace, key } => {
let item = self.get(&namespace, &key).await?;
StoreResult::Item(item)
}
StoreOp::Put {
namespace,
key,
value,
index,
} => {
self.put(&namespace, &key, value, index).await?;
StoreResult::None
}
StoreOp::Delete { namespace, key } => {
self.delete(&namespace, &key).await?;
StoreResult::None
}
StoreOp::Search(query) => {
let result = self.search(query).await?;
StoreResult::Items(result)
}
StoreOp::ListNamespaces {
prefix,
suffix,
max_depth,
limit,
} => {
let namespaces = self
.list_namespaces(
prefix.as_deref(),
suffix.as_deref(),
max_depth,
limit,
None,
)
.await?;
StoreResult::Namespaces(namespaces)
}
};
results.push(result);
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn active_value() -> serde_json::Value {
json!({ "status": "active" })
}
fn inactive_value() -> serde_json::Value {
json!({ "status": "inactive" })
}
#[test]
fn test_filter_not_negates_match() {
let filter = FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
}),
};
assert!(evaluate_filter(&filter, &inactive_value()));
}
#[test]
fn test_filter_not_inverts_true_to_false() {
let filter = FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
}),
};
assert!(!evaluate_filter(&filter, &active_value()));
}
#[test]
fn test_filter_not_combined_with_and() {
let value = json!({ "age": 25, "status": "active" });
let filter = FilterExpr::And {
expressions: vec![
FilterExpr::Gte {
field: "age".to_string(),
value: json!(18),
},
FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("banned"),
}),
},
],
};
assert!(evaluate_filter(&filter, &value));
let banned_value = json!({ "age": 25, "status": "banned" });
assert!(!evaluate_filter(&filter, &banned_value));
let young_value = json!({ "age": 17, "status": "active" });
assert!(!evaluate_filter(&filter, &young_value));
}
#[test]
fn test_filter_not_serialization_roundtrip() {
let filter = FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
}),
};
let serialized = serde_json::to_string(&filter).expect("serialization failed");
assert!(
serialized.contains("\"$not\""),
"serialized form must contain $not tag"
);
let deserialized: FilterExpr =
serde_json::from_str(&serialized).expect("deserialization failed");
let value = active_value();
assert_eq!(
evaluate_filter(&filter, &value),
evaluate_filter(&deserialized, &value),
"roundtrip filter must produce the same result"
);
}
#[test]
fn test_filter_nested_not() {
let filter = FilterExpr::Not {
expr: Box::new(FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
}),
}),
};
assert!(evaluate_filter(&filter, &active_value()));
assert!(!evaluate_filter(&filter, &inactive_value()));
}
#[tokio::test]
async fn test_ttl_expiration_on_get() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let item = store
.get("ns", "key1")
.await
.expect("get failed")
.expect("item should exist");
assert_eq!(item.key, "key1");
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let result = store.get("ns", "key1").await.expect("get failed");
assert!(result.is_none(), "item should have expired");
}
#[tokio::test]
async fn test_ttl_refresh_on_read() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(100)),
refresh_on_read: true,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
for _ in 0..3 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let item = store
.get("ns", "key1")
.await
.expect("get failed")
.expect("item should still exist after refresh");
assert_eq!(item.key, "key1");
}
let result = store.get("ns", "key1").await.expect("get failed");
assert!(
result.is_some(),
"item should still exist after TTL refreshes"
);
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
let result = store.get("ns", "key1").await.expect("get failed");
assert!(result.is_none(), "item should have expired after no reads");
}
#[tokio::test]
async fn test_ttl_search_filters_expired() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
store
.put("ns", "key2", json!({"v": 2}), None)
.await
.expect("put failed");
let query = SearchQuery {
namespace_prefix: "ns".to_string(),
filter: None,
query: None,
limit: 10,
offset: 0,
};
let result = store.search(query).await.expect("search failed");
assert_eq!(result.total_count, 2);
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let query = SearchQuery {
namespace_prefix: "ns".to_string(),
filter: None,
query: None,
limit: 10,
offset: 0,
};
let result = store.search(query).await.expect("search failed");
assert_eq!(
result.total_count, 0,
"expired items should be filtered from search"
);
}
#[tokio::test]
async fn test_no_ttl_items_never_expire() {
let store = MemoryStore::new();
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let has_no_expiry = {
let data = store.data.read().await;
data.get("ns")
.and_then(|ns| ns.get("key1"))
.is_some_and(|item| item.expires_at.is_none())
};
assert!(has_no_expiry, "item should have no expiration set");
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let result = store.get("ns", "key1").await.expect("get failed");
assert!(result.is_some(), "item without TTL should never expire");
}
#[tokio::test]
async fn test_ttl_lazy_cleanup_removes_from_underlying_storage() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(30)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let exists_before = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
};
assert!(exists_before, "item should exist in storage initially");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let _ = store.get("ns", "key1").await;
let exists_after = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
};
assert!(!exists_after, "expired item should be removed from storage");
}
#[tokio::test]
async fn test_ttl_refresh_updates_expires_at() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(200)),
refresh_on_read: true,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let original_expires = {
let data = store.data.read().await;
data.get("ns")
.and_then(|ns| ns.get("key1"))
.expect("item")
.expires_at
.expect("should have expires_at")
};
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let _ = store.get("ns", "key1").await;
let refreshed_expires = {
let data = store.data.read().await;
data.get("ns")
.and_then(|ns| ns.get("key1"))
.expect("item")
.expires_at
.expect("should have expires_at")
};
assert!(
refreshed_expires > original_expires,
"refresh_on_read should advance the expiration time: {refreshed_expires} should be > {original_expires}"
);
}
struct TestEmbeddingFunc;
#[async_trait::async_trait]
impl EmbeddingFunc for TestEmbeddingFunc {
async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError> {
Ok(texts
.iter()
.map(|text| {
let hash: u64 = text.bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
(h ^ u64::from(b)).wrapping_mul(0x0100_0000_01b3)
});
let mut vec: Vec<f32> = (0..8)
.map(|i| f32::from(((hash >> (i * 8)) & 0xFF) as u8) / 255.0)
.collect();
let norm = vec.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for v in &mut vec {
*v /= norm;
}
}
vec
})
.collect())
}
}
#[test]
fn test_cosine_similarity_identical_vectors() {
let v = vec![1.0, 0.0, 0.0];
let sim = cosine_similarity(&v, &v);
let expected = 1.0;
assert!(
(sim - expected).abs() < f32::EPSILON,
"identical vectors should have similarity 1.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_orthogonal_vectors() {
let a = vec![1.0, 0.0, 0.0];
let b = vec![0.0, 1.0, 0.0];
let sim = cosine_similarity(&a, &b);
let expected = 0.0;
assert!(
(sim - expected).abs() < f32::EPSILON,
"orthogonal vectors should have similarity 0.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_opposite_vectors() {
let a = vec![1.0, 0.0];
let b = vec![-1.0, 0.0];
let sim = cosine_similarity(&a, &b);
let expected = -1.0;
assert!(
(sim - expected).abs() < f32::EPSILON,
"opposite vectors should have similarity -1.0, got {sim}"
);
}
#[test]
fn test_cosine_similarity_zero_norm() {
let a = vec![0.0, 0.0, 0.0];
let b = vec![1.0, 0.0, 0.0];
let sim = cosine_similarity(&a, &b);
let expected = 0.0;
assert!(
(sim - expected).abs() < f32::EPSILON,
"zero-norm vector should give similarity 0.0, got {sim}"
);
}
#[tokio::test]
async fn test_search_with_embeddings_returns_scored_results() {
let index_config = IndexConfig {
dims: 8,
embed: Box::new(TestEmbeddingFunc),
fields: Some(vec!["text".to_string()]),
};
let store = MemoryStore::new().with_vector_search(index_config);
store
.put(
"docs",
"item1",
json!({"text": "hello world"}),
Some(vec!["text".to_string()]),
)
.await
.expect("put failed");
store
.put(
"docs",
"item2",
json!({"text": "quantum physics"}),
Some(vec!["text".to_string()]),
)
.await
.expect("put failed");
let query = SearchQuery {
namespace_prefix: "docs".to_string(),
filter: None,
query: Some("hello world".to_string()),
limit: 10,
offset: 0,
};
let result = store.search(query).await.expect("search failed");
assert!(
!result.items.is_empty(),
"search should return matching items"
);
for item in &result.items {
assert!(
item.score.is_some(),
"items with embeddings should have similarity scores"
);
}
if let Some(score) = result.items.first().and_then(|i| i.score) {
assert!(
score > 0.9,
"top result should have high similarity score, got {score}"
);
}
}
#[tokio::test]
async fn test_search_ordering_respects_similarity() {
let index_config = IndexConfig {
dims: 8,
embed: Box::new(TestEmbeddingFunc),
fields: Some(vec!["text".to_string()]),
};
let store = MemoryStore::new().with_vector_search(index_config);
store
.put(
"docs",
"hello-world",
json!({"text": "hello world"}),
Some(vec!["text".to_string()]),
)
.await
.expect("put failed");
store
.put(
"docs",
"hello-there",
json!({"text": "hello there"}),
Some(vec!["text".to_string()]),
)
.await
.expect("put failed");
store
.put(
"docs",
"quantum-physics",
json!({"text": "quantum physics"}),
Some(vec!["text".to_string()]),
)
.await
.expect("put failed");
let query = SearchQuery {
namespace_prefix: "docs".to_string(),
filter: None,
query: Some("hello world".to_string()),
limit: 10,
offset: 0,
};
let result = store.search(query).await.expect("search failed");
assert_eq!(
result.items.len(),
3,
"should return all 3 items in the namespace"
);
let first = result
.items
.first()
.expect("should have at least one result");
assert_eq!(
first.item.key, "hello-world",
"the most similar item should be ranked first"
);
for pair in result.items.windows(2) {
if let (Some(a), Some(b)) = (pair[0].score, pair[1].score) {
assert!(
a >= b,
"scores should be in descending order: {a} should be >= {b}"
);
}
}
}
#[tokio::test]
async fn test_search_without_index_returns_no_scores() {
let store = MemoryStore::new();
store
.put("docs", "item1", json!({"text": "hello"}), None)
.await
.expect("put failed");
store
.put("docs", "item2", json!({"text": "world"}), None)
.await
.expect("put failed");
let query = SearchQuery {
namespace_prefix: "docs".to_string(),
filter: None,
query: Some("hello".to_string()),
limit: 10,
offset: 0,
};
let result = store.search(query).await.expect("search failed");
assert_eq!(result.items.len(), 2, "should return all items");
for item in &result.items {
assert!(
item.score.is_none(),
"items without index should have no score"
);
}
}
#[tokio::test]
async fn test_list_namespaces_offset_skips_first_n() {
let store = MemoryStore::new();
for i in 0..5 {
store
.put(&format!("ns-{i}"), "key", json!({"v": i}), None)
.await
.expect("put failed");
}
let all_ns = store
.list_namespaces(None, None, None, None, None)
.await
.expect("list_namespaces failed");
assert_eq!(all_ns.len(), 5, "expected all 5 namespaces");
let offset_ns = store
.list_namespaces(None, None, None, None, Some(2))
.await
.expect("list_namespaces with offset failed");
assert_eq!(
offset_ns.len(),
3,
"offset=2 should skip 2 namespaces, leaving 3"
);
}
#[tokio::test]
async fn test_list_namespaces_offset_and_limit_together() {
let store = MemoryStore::new();
for i in 0..10 {
store
.put(&format!("ns-{i:02}"), "key", json!({"v": i}), None)
.await
.expect("put failed");
}
let page = store
.list_namespaces(None, None, None, Some(4), Some(3))
.await
.expect("list_namespaces failed");
assert_eq!(page.len(), 4, "offset=3 + limit=4 should yield 4 results");
}
#[tokio::test]
async fn test_list_namespaces_offset_larger_than_results() {
let store = MemoryStore::new();
store
.put("only-ns", "key", json!({"v": 1}), None)
.await
.expect("put failed");
let result = store
.list_namespaces(None, None, None, None, Some(100))
.await
.expect("list_namespaces failed");
assert!(
result.is_empty(),
"offset larger than result set should return empty"
);
}
#[tokio::test]
async fn test_list_namespaces_offset_with_prefix_filter() {
let store = MemoryStore::new();
for i in 0..6 {
let ns = if i < 3 {
format!("alpha-{i}")
} else {
format!("beta-{i}")
};
store
.put(&ns, "key", json!({"v": i}), None)
.await
.expect("put failed");
}
let result = store
.list_namespaces(Some("alpha-"), None, None, None, Some(1))
.await
.expect("list_namespaces failed");
assert_eq!(
result.len(),
2,
"prefix filter + offset=1 should leave 2 namespaces"
);
assert!(
result.iter().all(|ns| ns.starts_with("alpha-")),
"all results must match prefix filter"
);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_filter_to_sql_eq() {
let filter = FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
};
let (sql, params) = filter_to_sql_sqlite(&filter);
assert_eq!(sql, "json_extract(value, '$.status') = ?");
assert_eq!(params.len(), 1);
assert_eq!(sqlite_param_from_value(¶ms[0]), "active");
}
#[cfg(feature = "sqlite")]
#[test]
fn test_filter_to_sql_gt() {
let filter = FilterExpr::Gt {
field: "age".to_string(),
value: json!(18),
};
let (sql, params) = filter_to_sql_sqlite(&filter);
assert_eq!(
sql,
"CAST(json_extract(value, '$.age') AS REAL) > CAST(? AS REAL)"
);
assert_eq!(params.len(), 1);
assert_eq!(sqlite_param_from_value(¶ms[0]), "18");
}
#[cfg(feature = "sqlite")]
#[test]
fn test_filter_to_sql_and_or_combination() {
let filter = FilterExpr::And {
expressions: vec![
FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
},
FilterExpr::Or {
expressions: vec![
FilterExpr::Gte {
field: "age".to_string(),
value: json!(18),
},
FilterExpr::Eq {
field: "role".to_string(),
value: json!("admin"),
},
],
},
],
};
let (sql, params) = filter_to_sql_sqlite(&filter);
assert_eq!(
sql,
"(json_extract(value, '$.status') = ?) AND \
((CAST(json_extract(value, '$.age') AS REAL) >= CAST(? AS REAL)) OR \
(json_extract(value, '$.role') = ?))"
);
assert_eq!(params.len(), 3);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_filter_to_sql_not() {
let filter = FilterExpr::Not {
expr: Box::new(FilterExpr::Eq {
field: "status".to_string(),
value: json!("banned"),
}),
};
let (sql, params) = filter_to_sql_sqlite(&filter);
assert!(sql.starts_with("NOT ("));
assert!(sql.contains("json_extract(value, '$.status') = ?"));
assert_eq!(params.len(), 1);
}
#[cfg(feature = "sqlite")]
#[test]
fn test_sqlite_param_bool_true() {
assert_eq!(sqlite_param_from_value(&json!(true)), "1");
}
#[cfg(feature = "sqlite")]
#[test]
fn test_sqlite_param_bool_false() {
assert_eq!(sqlite_param_from_value(&json!(false)), "0");
}
#[cfg(feature = "sqlite")]
#[test]
fn test_sqlite_param_string() {
assert_eq!(sqlite_param_from_value(&json!("hello")), "hello");
}
#[cfg(feature = "sqlite")]
#[test]
fn test_sqlite_param_number() {
assert_eq!(sqlite_param_from_value(&json!(42)), "42");
assert_eq!(sqlite_param_from_value(&json!(42.5)), "42.5");
}
#[cfg(feature = "postgres")]
#[test]
fn test_filter_to_sql_postgres_eq() {
let filter = FilterExpr::Eq {
field: "status".to_string(),
value: json!("active"),
};
let (sql, params) = filter_to_sql_postgres(&filter);
assert_eq!(sql, "value #>> '{status}' = ?");
assert_eq!(params.len(), 1);
}
#[cfg(feature = "postgres")]
#[test]
fn test_filter_to_sql_postgres_nested_field() {
let filter = FilterExpr::Eq {
field: "user.address.city".to_string(),
value: json!("NYC"),
};
let (sql, _params) = filter_to_sql_postgres(&filter);
assert_eq!(sql, "value #>> '{user,address,city}' = ?");
}
#[cfg(feature = "postgres")]
#[test]
fn test_filter_to_sql_postgres_numeric_compare() {
let filter = FilterExpr::Lt {
field: "price".to_string(),
value: json!(100.0),
};
let (sql, _params) = filter_to_sql_postgres(&filter);
assert_eq!(sql, "(value #> '{price}')::numeric < CAST(? AS numeric)");
}
#[cfg(feature = "postgres")]
#[test]
fn test_filter_to_sql_postgres_and_or_not() {
let filter = FilterExpr::And {
expressions: vec![
FilterExpr::Eq {
field: "a".to_string(),
value: json!(1),
},
FilterExpr::Not {
expr: Box::new(FilterExpr::Or {
expressions: vec![
FilterExpr::Eq {
field: "b".to_string(),
value: json!(2),
},
FilterExpr::Eq {
field: "c".to_string(),
value: json!(3),
},
],
}),
},
],
};
let (sql, params) = filter_to_sql_postgres(&filter);
assert_eq!(params.len(), 3);
assert!(sql.contains("AND"));
assert!(sql.contains("NOT ("));
assert!(sql.contains("OR"));
}
#[tokio::test]
async fn test_sweep_expired_items_removes_expired() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
store
.put("ns", "key2", json!({"v": 2}), None)
.await
.expect("put failed");
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let count = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count, 2, "sweep should remove 2 expired items");
let exists = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
};
assert!(!exists, "expired item should be removed from storage");
let exists = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key2"))
};
assert!(!exists, "expired item should be removed from storage");
}
#[tokio::test]
async fn test_sweep_expired_items_respects_max_items_limit() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
sweep_max_items: 2,
..Default::default()
});
for i in 1..=5 {
store
.put("ns", &format!("key{i}"), json!({"v": i}), None)
.await
.expect("put failed");
}
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let count1 = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(
count1, 2,
"first sweep should respect sweep_max_items limit"
);
let count2 = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count2, 2, "second sweep should remove 2 more items");
let count3 = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count3, 1, "third sweep should remove last item");
let count4 = store.sweep_expired_items().await.expect("sweep_failed");
assert_eq!(count4, 0, "fourth sweep should find no expired items");
}
#[tokio::test]
async fn test_sweep_expired_items_across_multiple_namespaces() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns1", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
store
.put("ns1", "key2", json!({"v": 2}), None)
.await
.expect("put failed");
store
.put("ns2", "key1", json!({"v": 3}), None)
.await
.expect("put failed");
store
.put("ns2", "key2", json!({"v": 4}), None)
.await
.expect("put failed");
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let count = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count, 4, "sweep should remove all 4 expired items");
let total_items = {
let data = store.data.read().await;
data.values()
.map(std::collections::HashMap::len)
.sum::<usize>()
};
assert_eq!(total_items, 0, "all items should be removed");
}
#[tokio::test]
async fn test_sweep_expired_items_does_not_remove_non_expired() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_secs(10)),
refresh_on_read: false,
..Default::default()
});
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let count = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count, 0, "sweep should not remove non-expired items");
let item = store
.get("ns", "key1")
.await
.expect("get failed")
.expect("item should still exist");
assert_eq!(item.key, "key1");
}
#[tokio::test]
async fn test_sweep_expired_items_with_no_ttl_items() {
let store = MemoryStore::new();
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let count = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count, 0, "sweep should not remove items without expiration");
let item = store
.get("ns", "key1")
.await
.expect("get failed")
.expect("item should still exist");
assert_eq!(item.key, "key1");
}
#[tokio::test]
async fn test_start_sweep_task_runs_periodically() {
let store = Arc::new(MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
sweep_interval: std::time::Duration::from_millis(100),
..Default::default()
}));
store
.put("ns", "key1", json!({"v": 1}), None)
.await
.expect("put failed");
let store_clone = Arc::clone(&store);
let handle = store_clone.start_sweep_task();
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
let exists = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
};
assert!(!exists, "sweep task should have removed expired item");
handle.abort();
}
#[tokio::test]
async fn test_sweep_and_lazy_cleanup_work_together() {
let store = MemoryStore::new().with_ttl_config(TTLConfig {
default_ttl: Some(std::time::Duration::from_millis(50)),
refresh_on_read: false,
..Default::default()
});
for i in 1..=5 {
store
.put("ns", &format!("key{i}"), json!({"v": i}), None)
.await
.expect("put failed");
}
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
let _ = store.get("ns", "key1").await;
let exists1 = {
let data = store.data.read().await;
data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
};
assert!(!exists1, "lazy cleanup should remove key1");
let count = store.sweep_expired_items().await.expect("sweep failed");
assert_eq!(count, 4, "sweep should remove remaining 4 items");
let total_items = {
let data = store.data.read().await;
data.values()
.map(std::collections::HashMap::len)
.sum::<usize>()
};
assert_eq!(total_items, 0, "all items should be removed");
}
}