1use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[cfg(any(feature = "sqlite", feature = "postgres"))]
13use std::fmt::Write;
14
15#[cfg(any(feature = "sqlite", feature = "postgres"))]
16use sqlx::Row;
17
18#[derive(Debug, thiserror::Error)]
20pub enum StoreError {
21 #[error("item not found: {namespace}/{key}")]
23 NotFound {
24 namespace: String,
26 key: String,
28 },
29
30 #[error("invalid namespace: {0}")]
32 InvalidNamespace(String),
33
34 #[error("serialization error: {0}")]
36 Serialize(#[from] serde_json::Error),
37
38 #[error("storage error: {0}")]
40 Storage(String),
41
42 #[error("vector search error: {0}")]
44 VectorSearch(String),
45
46 #[error("embedding error: {0}")]
48 Embedding(String),
49}
50
51#[async_trait]
59pub trait Store: Send + Sync + 'static {
60 async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError>;
67
68 async fn put(
77 &self,
78 namespace: &str,
79 key: &str,
80 value: serde_json::Value,
81 index: Option<Vec<String>>,
82 ) -> Result<(), StoreError>;
83
84 async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError>;
91
92 async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError>;
98
99 async fn list_namespaces(
109 &self,
110 prefix: Option<&str>,
111 suffix: Option<&str>,
112 max_depth: Option<usize>,
113 limit: Option<usize>,
114 offset: Option<usize>,
115 ) -> Result<Vec<String>, StoreError>;
116
117 async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError>;
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct Item {
128 pub namespace: String,
130 pub key: String,
132 pub value: serde_json::Value,
134 pub created_at: DateTime<Utc>,
136 pub updated_at: DateTime<Utc>,
138 pub expires_at: Option<DateTime<Utc>>,
140 #[serde(skip_serializing_if = "Option::is_none")]
145 pub embedding: Option<Vec<f32>>,
146}
147
148impl Item {
149 #[must_use]
153 pub fn is_expired(&self) -> bool {
154 self.expires_at
155 .is_some_and(|expires_at| Utc::now() > expires_at)
156 }
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct SearchItem {
162 #[serde(flatten)]
164 pub item: Item,
165 pub score: Option<f64>,
167}
168
169#[derive(Debug, Clone, Default)]
171pub struct SearchQuery {
172 pub namespace_prefix: String,
174 pub filter: Option<FilterExpr>,
176 pub query: Option<String>,
178 pub limit: usize,
180 pub offset: usize,
182}
183
184#[derive(Debug, Clone)]
186pub struct SearchResult {
187 pub items: Vec<SearchItem>,
189 pub total_count: usize,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "op")]
196pub enum FilterExpr {
197 #[serde(rename = "$eq")]
199 Eq {
200 field: String,
202 value: serde_json::Value,
204 },
205 #[serde(rename = "$ne")]
207 Ne {
208 field: String,
210 value: serde_json::Value,
212 },
213 #[serde(rename = "$gt")]
215 Gt {
216 field: String,
218 value: serde_json::Value,
220 },
221 #[serde(rename = "$gte")]
223 Gte {
224 field: String,
226 value: serde_json::Value,
228 },
229 #[serde(rename = "$lt")]
231 Lt {
232 field: String,
234 value: serde_json::Value,
236 },
237 #[serde(rename = "$lte")]
239 Lte {
240 field: String,
242 value: serde_json::Value,
244 },
245 #[serde(rename = "$and")]
247 And {
248 expressions: Vec<FilterExpr>,
250 },
251 #[serde(rename = "$or")]
253 Or {
254 expressions: Vec<FilterExpr>,
256 },
257 #[serde(rename = "$not")]
259 Not {
260 expr: Box<FilterExpr>,
262 },
263}
264
265impl FilterExpr {
266 #[must_use]
268 pub fn matches(&self, value: &serde_json::Value) -> bool {
272 evaluate_filter(self, value)
273 }
274}
275
276#[derive(Debug, Clone)]
278pub enum StoreOp {
279 Get {
281 namespace: String,
283 key: String,
285 },
286 Put {
288 namespace: String,
290 key: String,
292 value: serde_json::Value,
294 index: Option<Vec<String>>,
296 },
297 Delete {
299 namespace: String,
301 key: String,
303 },
304 Search(SearchQuery),
306 ListNamespaces {
308 prefix: Option<String>,
310 suffix: Option<String>,
312 max_depth: Option<usize>,
314 limit: Option<usize>,
316 },
317}
318
319#[derive(Debug, Clone)]
321pub enum StoreResult {
322 Item(Option<Item>),
324 Items(SearchResult),
326 Namespaces(Vec<String>),
328 None,
330}
331
332#[derive(Clone, Debug)]
352pub struct TTLConfig {
353 pub default_ttl: Option<std::time::Duration>,
358 pub refresh_on_read: bool,
364 pub sweep_interval: std::time::Duration,
371 pub sweep_max_items: usize,
377}
378
379impl Default for TTLConfig {
380 fn default() -> Self {
381 Self {
382 default_ttl: None,
383 refresh_on_read: false,
384 sweep_interval: std::time::Duration::from_secs(300),
385 sweep_max_items: 1000,
386 }
387 }
388}
389
390#[derive(Debug)]
394pub struct MemoryStore {
395 data: Arc<tokio::sync::RwLock<HashMap<String, HashMap<String, Item>>>>,
397 index_config: Option<IndexConfig>,
399 ttl_config: TTLConfig,
401}
402
403#[async_trait::async_trait]
413pub trait EmbeddingFunc: Send + Sync + 'static {
414 async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError>;
424}
425
426pub struct IndexConfig {
431 pub dims: usize,
433 pub embed: Box<dyn EmbeddingFunc>,
435 pub fields: Option<Vec<String>>,
437}
438
439impl std::fmt::Debug for IndexConfig {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 f.debug_struct("IndexConfig")
442 .field("dims", &self.dims)
443 .field("embed", &"...")
444 .field("fields", &self.fields)
445 .finish()
446 }
447}
448
449impl Default for MemoryStore {
450 fn default() -> Self {
451 Self::new()
452 }
453}
454
455impl MemoryStore {
456 #[must_use]
458 pub fn new() -> Self {
459 Self {
460 data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
461 index_config: None,
462 ttl_config: TTLConfig::default(),
463 }
464 }
465
466 #[must_use]
472 pub fn with_vector_search(mut self, config: IndexConfig) -> Self {
473 self.index_config = Some(config);
474 self
475 }
476
477 #[must_use]
483 pub const fn with_ttl_config(mut self, config: TTLConfig) -> Self {
484 self.ttl_config = config;
485 self
486 }
487
488 #[allow(
531 clippy::significant_drop_tightening,
532 reason = "Write lock must be held during iteration and removal"
533 )]
534 pub async fn sweep_expired_items(&self) -> Result<usize, StoreError> {
535 let now = Utc::now();
536 let mut count = 0;
537 let mut items = self.data.write().await;
538
539 let mut keys_to_remove = Vec::new();
542
543 for (namespace, namespace_map) in items.iter() {
544 for (key, item) in namespace_map {
545 if let Some(expires_at) = item.expires_at
546 && expires_at < now
547 {
548 keys_to_remove.push((namespace.clone(), key.clone()));
549 count += 1;
550 if count >= self.ttl_config.sweep_max_items {
551 break;
552 }
553 }
554 }
555 if count >= self.ttl_config.sweep_max_items {
556 break;
557 }
558 }
559
560 for (namespace, key) in keys_to_remove {
562 if let Some(namespace_map) = items.get_mut(&namespace) {
563 namespace_map.remove(&key);
564 }
565 }
566
567 Ok(count)
568 }
569
570 #[must_use]
605 pub fn start_sweep_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
606 tokio::spawn(async move {
607 let mut interval = tokio::time::interval(self.ttl_config.sweep_interval);
608 loop {
609 interval.tick().await;
610 if let Err(e) = self.sweep_expired_items().await {
611 tracing::warn!("Store sweep failed: {}", e);
612 }
613 }
614 })
615 }
616}
617
618#[async_trait]
619impl Store for MemoryStore {
620 #[allow(
621 clippy::significant_drop_tightening,
622 reason = "Read lock is scoped tightly; write lock acquired after release"
623 )]
624 async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
625 let is_expired = {
627 let data = self.data.read().await;
628 let Some(ns) = data.get(namespace) else {
629 return Ok(None);
630 };
631 let Some(item) = ns.get(key) else {
632 return Ok(None);
633 };
634 item.is_expired()
635 };
636
637 if is_expired {
638 let mut data = self.data.write().await;
640 if let Some(ns_map) = data.get_mut(namespace) {
641 ns_map.remove(key);
642 }
643 drop(data);
644 return Ok(None);
645 }
646
647 if self.ttl_config.refresh_on_read && self.ttl_config.default_ttl.is_some() {
648 let ttl = self.ttl_config.default_ttl.expect("checked is_some above");
650 let now = Utc::now();
651 let new_expires =
652 now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
653
654 let mut data = self.data.write().await;
655 if let Some(ns_map) = data.get_mut(namespace)
656 && let Some(item) = ns_map.get_mut(key)
657 {
658 item.expires_at = Some(new_expires);
659 item.updated_at = now;
660 let cloned = item.clone();
661 drop(data);
662 return Ok(Some(cloned));
663 }
664 drop(data);
665 return Ok(None);
667 }
668
669 let data = self.data.read().await;
671 let item = data.get(namespace).and_then(|ns| ns.get(key).cloned());
672 Ok(item)
673 }
674
675 #[allow(
676 clippy::significant_drop_tightening,
677 reason = "Lock must be held for entire put operation after embedding"
678 )]
679 async fn put(
680 &self,
681 namespace: &str,
682 key: &str,
683 value: serde_json::Value,
684 index: Option<Vec<String>>,
685 ) -> Result<(), StoreError> {
686 let embedding = if let Some(ref index_config) = self.index_config {
688 if let Some(index_fields) = &index {
689 if index_fields.is_empty() {
690 None
691 } else {
692 let text = extract_index_text(&value, index_fields);
693 if text.is_empty() {
694 None
695 } else {
696 let mut embeddings = index_config.embed.embed(vec![text]).await?;
697 embeddings.pop()
698 }
699 }
700 } else {
701 None
702 }
703 } else {
704 None
705 };
706
707 let now = Utc::now();
708 let expires_at = self
709 .ttl_config
710 .default_ttl
711 .map(|ttl| now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX));
712
713 let mut data = self.data.write().await;
714
715 let namespace_map = data
716 .entry(namespace.to_string())
717 .or_insert_with(HashMap::new);
718 let existing = namespace_map.get(key);
719
720 let item = Item {
721 namespace: namespace.to_string(),
722 key: key.to_string(),
723 value,
724 created_at: existing.map_or(now, |i| i.created_at),
725 updated_at: now,
726 expires_at,
727 embedding,
728 };
729
730 namespace_map.insert(key.to_string(), item);
731 Ok(())
732 }
733
734 #[allow(
735 clippy::significant_drop_tightening,
736 reason = "Lock must be held for entire delete operation"
737 )]
738 async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
739 let mut data = self.data.write().await;
740 if let Some(namespace_map) = data.get_mut(namespace) {
741 namespace_map.remove(key);
742 }
743 Ok(())
744 }
745
746 #[allow(
747 clippy::significant_drop_tightening,
748 reason = "Lock must be held for entire search iteration"
749 )]
750 async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
751 let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
753 if let Some(query_text) = &query.query {
754 if query_text.is_empty() {
755 None
756 } else {
757 let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
758 embeddings.pop()
759 }
760 } else {
761 None
762 }
763 } else {
764 None
765 };
766
767 let mut items: Vec<SearchItem> = {
769 let data = self.data.read().await;
770 let mut results = Vec::new();
771
772 for (namespace, namespace_map) in data.iter() {
773 if namespace.starts_with(&query.namespace_prefix) {
774 for item in namespace_map.values() {
775 if item.is_expired() {
776 continue;
777 }
778
779 if query
780 .filter
781 .as_ref()
782 .is_some_and(|filter| !evaluate_filter(filter, &item.value))
783 {
784 continue;
785 }
786
787 let score = query_embedding.as_ref().and_then(|q_emb| {
788 item.embedding
789 .as_ref()
790 .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
791 });
792
793 results.push(SearchItem {
794 item: item.clone(),
795 score,
796 });
797 }
798 }
799 }
800 results
801 };
802
803 let total = items.len();
804
805 if query_embedding.is_some() {
807 items.sort_by(|a, b| {
808 b.score
809 .partial_cmp(&a.score)
810 .unwrap_or(std::cmp::Ordering::Equal)
811 });
812 }
813
814 let start = query.offset.min(items.len());
816 let end = (start + query.limit).min(items.len());
817 let page = items.drain(start..end).collect();
818
819 Ok(SearchResult {
820 items: page,
821 total_count: total,
822 })
823 }
824
825 async fn list_namespaces(
826 &self,
827 prefix: Option<&str>,
828 suffix: Option<&str>,
829 max_depth: Option<usize>,
830 limit: Option<usize>,
831 offset: Option<usize>,
832 ) -> Result<Vec<String>, StoreError> {
833 let mut namespaces: Vec<String> = {
834 let data = self.data.read().await;
835 data.keys().cloned().collect()
836 };
837
838 if let Some(prefix_filter) = prefix {
840 namespaces.retain(|ns| ns.starts_with(prefix_filter));
841 }
842 if let Some(suffix_filter) = suffix {
843 namespaces.retain(|ns| ns.ends_with(suffix_filter));
844 }
845
846 if let Some(depth) = max_depth {
848 namespaces = namespaces
849 .into_iter()
850 .map(|ns| {
851 let parts: Vec<&str> = ns.split('/').take(depth).collect();
852 parts.join("/")
853 })
854 .collect();
855 namespaces.sort();
856 namespaces.dedup();
857 }
858
859 if let Some(offset_value) = offset {
861 let skip = offset_value.min(namespaces.len());
862 namespaces.drain(..skip);
863 }
864
865 if let Some(limit_value) = limit {
867 namespaces.truncate(limit_value);
868 }
869
870 Ok(namespaces)
871 }
872
873 async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
874 let mut results = Vec::with_capacity(ops.len());
875
876 for op in ops {
877 let result = match op {
878 StoreOp::Get { namespace, key } => {
879 let item = self.get(&namespace, &key).await?;
880 StoreResult::Item(item)
881 }
882 StoreOp::Put {
883 namespace,
884 key,
885 value,
886 index,
887 } => {
888 self.put(&namespace, &key, value, index).await?;
889 StoreResult::None
890 }
891 StoreOp::Delete { namespace, key } => {
892 self.delete(&namespace, &key).await?;
893 StoreResult::None
894 }
895 StoreOp::Search(query) => {
896 let result = self.search(query).await?;
897 StoreResult::Items(result)
898 }
899 StoreOp::ListNamespaces {
900 prefix,
901 suffix,
902 max_depth,
903 limit,
904 } => {
905 let namespaces = self
906 .list_namespaces(
907 prefix.as_deref(),
908 suffix.as_deref(),
909 max_depth,
910 limit,
911 None,
912 )
913 .await?;
914 StoreResult::Namespaces(namespaces)
915 }
916 };
917 results.push(result);
918 }
919
920 Ok(results)
921 }
922}
923
924fn evaluate_filter(filter: &FilterExpr, value: &serde_json::Value) -> bool {
926 match filter {
927 FilterExpr::Eq {
928 field,
929 value: expected,
930 } => get_field(value, field).is_some_and(|v| v == *expected),
931 FilterExpr::Ne {
932 field,
933 value: expected,
934 } => get_field(value, field).is_none_or(|v| v != *expected),
935 FilterExpr::Gt {
936 field,
937 value: expected,
938 } => compare_numbers(value, field, expected, |a, b| a > b),
939 FilterExpr::Gte {
940 field,
941 value: expected,
942 } => compare_numbers(value, field, expected, |a, b| a >= b),
943 FilterExpr::Lt {
944 field,
945 value: expected,
946 } => compare_numbers(value, field, expected, |a, b| a < b),
947 FilterExpr::Lte {
948 field,
949 value: expected,
950 } => compare_numbers(value, field, expected, |a, b| a <= b),
951 FilterExpr::And { expressions } => {
952 expressions.iter().all(|expr| evaluate_filter(expr, value))
953 }
954 FilterExpr::Or { expressions } => {
955 expressions.iter().any(|expr| evaluate_filter(expr, value))
956 }
957 FilterExpr::Not { expr } => !evaluate_filter(expr, value),
958 }
959}
960
961fn get_field(value: &serde_json::Value, path: &str) -> Option<serde_json::Value> {
963 let parts: Vec<&str> = path.split('.').collect();
964 let mut current = value;
965
966 for part in parts {
967 match current {
968 serde_json::Value::Object(map) => {
969 current = map.get(part)?;
970 }
971 _ => return None,
972 }
973 }
974
975 Some(current.clone())
976}
977
978fn compare_numbers(
980 value: &serde_json::Value,
981 field: &str,
982 expected: &serde_json::Value,
983 comparator: impl Fn(f64, f64) -> bool,
984) -> bool {
985 match (get_field(value, field), expected) {
986 (Some(serde_json::Value::Number(a)), serde_json::Value::Number(b)) => {
987 match (a.as_f64(), b.as_f64()) {
988 (Some(a_val), Some(b_val)) => comparator(a_val, b_val),
989 _ => false,
990 }
991 }
992 _ => false,
993 }
994}
995
996#[must_use]
1003pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
1004 let len = a.len().min(b.len());
1005 if len == 0 {
1006 return 0.0;
1007 }
1008 let a = &a[..len];
1009 let b = &b[..len];
1010 let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1011 let norm_a = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1012 let norm_b = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1013 if norm_a == 0.0 || norm_b == 0.0 {
1014 return 0.0;
1015 }
1016 dot_product / (norm_a * norm_b)
1017}
1018
1019fn extract_index_text(value: &serde_json::Value, fields: &[String]) -> String {
1024 fields
1025 .iter()
1026 .filter_map(|field| {
1027 get_field(value, field).map(|v| {
1028 v.as_str()
1029 .map_or_else(|| v.to_string(), ToString::to_string)
1030 })
1031 })
1032 .collect::<Vec<_>>()
1033 .join(" ")
1034}
1035
1036#[cfg(feature = "sqlite")]
1042#[derive(Debug)]
1043pub struct SqliteStore {
1044 pool: Option<sqlx::SqlitePool>,
1046 index_config: Option<IndexConfig>,
1048}
1049
1050#[cfg(feature = "sqlite")]
1051impl SqliteStore {
1052 pub async fn new(database_url: &str) -> Result<Self, StoreError> {
1062 let pool = sqlx::SqlitePool::connect(database_url)
1063 .await
1064 .map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
1065
1066 sqlx::query(
1068 r"
1069 CREATE TABLE IF NOT EXISTS store_items (
1070 namespace TEXT NOT NULL,
1071 key TEXT NOT NULL,
1072 value TEXT NOT NULL,
1073 created_at TEXT NOT NULL,
1074 updated_at TEXT NOT NULL,
1075 PRIMARY KEY (namespace, key)
1076 )
1077 ",
1078 )
1079 .execute(&pool)
1080 .await
1081 .map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
1082
1083 sqlx::query(
1086 r"
1087 CREATE TABLE IF NOT EXISTS store_vectors (
1088 namespace TEXT NOT NULL,
1089 key TEXT NOT NULL,
1090 field TEXT NOT NULL,
1091 vector BLOB NOT NULL,
1092 PRIMARY KEY (namespace, key, field),
1093 FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
1094 )
1095 ",
1096 )
1097 .execute(&pool)
1098 .await
1099 .map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
1100
1101 Ok(Self {
1102 pool: Some(pool),
1103 index_config: None,
1104 })
1105 }
1106
1107 pub async fn with_vector_search(
1118 database_url: &str,
1119 config: IndexConfig,
1120 ) -> Result<Self, StoreError> {
1121 let mut store = Self::new(database_url).await?;
1122 store.index_config = Some(config);
1123 Ok(store)
1124 }
1125}
1126
1127#[cfg(feature = "sqlite")]
1133fn filter_to_sql_sqlite(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
1134 match filter {
1135 FilterExpr::Eq { field, value } => (
1136 format!("json_extract(value, '$.{field}') = ?"),
1137 vec![value.clone()],
1138 ),
1139 FilterExpr::Ne { field, value } => (
1140 format!("json_extract(value, '$.{field}') != ?"),
1141 vec![value.clone()],
1142 ),
1143 FilterExpr::Gt { field, value } => (
1144 format!("CAST(json_extract(value, '$.{field}') AS REAL) > CAST(? AS REAL)"),
1145 vec![value.clone()],
1146 ),
1147 FilterExpr::Gte { field, value } => (
1148 format!("CAST(json_extract(value, '$.{field}') AS REAL) >= CAST(? AS REAL)"),
1149 vec![value.clone()],
1150 ),
1151 FilterExpr::Lt { field, value } => (
1152 format!("CAST(json_extract(value, '$.{field}') AS REAL) < CAST(? AS REAL)"),
1153 vec![value.clone()],
1154 ),
1155 FilterExpr::Lte { field, value } => (
1156 format!("CAST(json_extract(value, '$.{field}') AS REAL) <= CAST(? AS REAL)"),
1157 vec![value.clone()],
1158 ),
1159 FilterExpr::And { expressions } => {
1160 let mut clauses = Vec::with_capacity(expressions.len());
1161 let mut all_params = Vec::new();
1162 for expr in expressions {
1163 let (clause, params) = filter_to_sql_sqlite(expr);
1164 clauses.push(format!("({clause})"));
1165 all_params.extend(params);
1166 }
1167 (clauses.join(" AND "), all_params)
1168 }
1169 FilterExpr::Or { expressions } => {
1170 let mut clauses = Vec::with_capacity(expressions.len());
1171 let mut all_params = Vec::new();
1172 for expr in expressions {
1173 let (clause, params) = filter_to_sql_sqlite(expr);
1174 clauses.push(format!("({clause})"));
1175 all_params.extend(params);
1176 }
1177 (clauses.join(" OR "), all_params)
1178 }
1179 FilterExpr::Not { expr } => {
1180 let (clause, params) = filter_to_sql_sqlite(expr);
1181 (format!("NOT ({clause})"), params)
1182 }
1183 }
1184}
1185
1186#[cfg(feature = "sqlite")]
1191fn sqlite_param_from_value(value: &serde_json::Value) -> String {
1192 match value {
1193 serde_json::Value::Bool(true) => "1".to_string(),
1194 serde_json::Value::Bool(false) => "0".to_string(),
1195 serde_json::Value::String(s) => s.clone(),
1196 other => other.to_string(),
1197 }
1198}
1199
1200#[cfg(feature = "sqlite")]
1201#[async_trait]
1202impl Store for SqliteStore {
1203 async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
1204 let pool = self
1205 .pool
1206 .as_ref()
1207 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1208
1209 let result = sqlx::query(
1210 "SELECT value, created_at, updated_at FROM store_items WHERE namespace = ? AND key = ?",
1211 )
1212 .bind(namespace)
1213 .bind(key)
1214 .fetch_optional(pool)
1215 .await
1216 .map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
1217
1218 if let Some(row) = result {
1219 let value_str: String = row
1220 .try_get("value")
1221 .map_err(|e| StoreError::Storage(e.to_string()))?;
1222 let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
1223 let created_at: String = row
1224 .try_get("created_at")
1225 .map_err(|e| StoreError::Storage(e.to_string()))?;
1226 let updated_at: String = row
1227 .try_get("updated_at")
1228 .map_err(|e| StoreError::Storage(e.to_string()))?;
1229
1230 let embedding = if self.index_config.is_some() {
1232 let vector_row = sqlx::query(
1233 "SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
1234 )
1235 .bind(namespace)
1236 .bind(key)
1237 .fetch_optional(pool)
1238 .await
1239 .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1240
1241 if let Some(vrow) = vector_row {
1242 let bytes: Vec<u8> = vrow
1243 .try_get("vector")
1244 .map_err(|e| StoreError::Storage(e.to_string()))?;
1245 Some(blob_to_vector(&bytes)?)
1246 } else {
1247 None
1248 }
1249 } else {
1250 None
1251 };
1252
1253 Ok(Some(Item {
1254 namespace: namespace.to_string(),
1255 key: key.to_string(),
1256 value,
1257 created_at: chrono::DateTime::parse_from_rfc3339(&created_at)
1258 .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1259 .with_timezone(&chrono::Utc),
1260 updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at)
1261 .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1262 .with_timezone(&chrono::Utc),
1263 expires_at: None,
1264 embedding,
1265 }))
1266 } else {
1267 Ok(None)
1268 }
1269 }
1270
1271 async fn put(
1272 &self,
1273 namespace: &str,
1274 key: &str,
1275 value: serde_json::Value,
1276 index: Option<Vec<String>>,
1277 ) -> Result<(), StoreError> {
1278 let pool = self
1279 .pool
1280 .as_ref()
1281 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1282
1283 let now = Utc::now();
1284
1285 let embedding = if let Some(ref index_config) = self.index_config {
1287 if let Some(index_fields) = &index {
1288 if index_fields.is_empty() {
1289 None
1290 } else {
1291 let text = extract_index_text(&value, index_fields);
1292 if text.is_empty() {
1293 None
1294 } else {
1295 let mut embeddings = index_config.embed.embed(vec![text]).await?;
1296 embeddings.pop()
1297 }
1298 }
1299 } else {
1300 None
1301 }
1302 } else {
1303 None
1304 };
1305
1306 let value_str = serde_json::to_string(&value).map_err(StoreError::Serialize)?;
1307 let now_str = now.to_rfc3339();
1308
1309 sqlx::query(
1310 r"
1311 INSERT INTO store_items (namespace, key, value, created_at, updated_at)
1312 VALUES (?, ?, ?, ?, ?)
1313 ON CONFLICT (namespace, key) DO UPDATE SET
1314 value = excluded.value,
1315 updated_at = excluded.updated_at
1316 ",
1317 )
1318 .bind(namespace)
1319 .bind(key)
1320 .bind(&value_str)
1321 .bind(&now_str)
1322 .bind(&now_str)
1323 .execute(pool)
1324 .await
1325 .map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
1326
1327 if let Some(vec) = embedding {
1329 let bytes = vector_to_blob(&vec);
1330 sqlx::query(
1331 r"
1332 INSERT INTO store_vectors (namespace, key, field, vector)
1333 VALUES (?, ?, 'default', ?)
1334 ON CONFLICT (namespace, key, field) DO UPDATE SET
1335 vector = excluded.vector
1336 ",
1337 )
1338 .bind(namespace)
1339 .bind(key)
1340 .bind(&bytes)
1341 .execute(pool)
1342 .await
1343 .map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
1344 }
1345
1346 Ok(())
1347 }
1348
1349 async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
1350 let pool = self
1351 .pool
1352 .as_ref()
1353 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1354
1355 sqlx::query("DELETE FROM store_items WHERE namespace = ? AND key = ?")
1356 .bind(namespace)
1357 .bind(key)
1358 .execute(pool)
1359 .await
1360 .map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
1361
1362 Ok(())
1363 }
1364
1365 #[allow(
1366 clippy::cast_possible_truncation,
1367 clippy::cast_possible_wrap,
1368 clippy::cast_sign_loss,
1369 clippy::as_conversions,
1370 clippy::similar_names,
1371 clippy::too_many_lines,
1372 reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
1373 )]
1374 async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
1375 let pool = self
1376 .pool
1377 .as_ref()
1378 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1379
1380 let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
1382 if let Some(query_text) = &query.query {
1383 if query_text.is_empty() {
1384 None
1385 } else {
1386 let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
1387 embeddings.pop()
1388 }
1389 } else {
1390 None
1391 }
1392 } else {
1393 None
1394 };
1395
1396 let namespace_pattern = format!("{}%", query.namespace_prefix);
1397 let mut conditions = vec!["namespace LIKE ?".to_string()];
1398 let mut params_str: Vec<String> = vec![namespace_pattern];
1399
1400 if let Some(ref filter) = query.filter {
1401 let (clause, filter_params) = filter_to_sql_sqlite(filter);
1402 conditions.push(format!("({clause})"));
1403 for p in &filter_params {
1404 params_str.push(sqlite_param_from_value(p));
1405 }
1406 }
1407
1408 let where_clause = conditions.join(" AND ");
1409
1410 let data_sql = format!(
1412 "SELECT namespace, key, value, created_at, updated_at \
1413 FROM store_items WHERE {where_clause} \
1414 ORDER BY namespace, key"
1415 );
1416 let mut data_query = sqlx::query(&data_sql);
1417 for p in ¶ms_str {
1418 data_query = data_query.bind(p.as_str());
1419 }
1420
1421 let rows = data_query
1422 .fetch_all(pool)
1423 .await
1424 .map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
1425
1426 let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
1427
1428 for row in rows {
1430 let namespace: String = row
1431 .try_get("namespace")
1432 .map_err(|e| StoreError::Storage(e.to_string()))?;
1433 let key: String = row
1434 .try_get("key")
1435 .map_err(|e| StoreError::Storage(e.to_string()))?;
1436 let value_str: String = row
1437 .try_get("value")
1438 .map_err(|e| StoreError::Storage(e.to_string()))?;
1439 let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
1440 let created_at_str: String = row
1441 .try_get("created_at")
1442 .map_err(|e| StoreError::Storage(e.to_string()))?;
1443 let updated_at_str: String = row
1444 .try_get("updated_at")
1445 .map_err(|e| StoreError::Storage(e.to_string()))?;
1446
1447 let embedding = if query_embedding.is_some() {
1449 let vector_row = sqlx::query(
1450 "SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
1451 )
1452 .bind(&namespace)
1453 .bind(&key)
1454 .fetch_optional(pool)
1455 .await
1456 .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1457
1458 if let Some(vrow) = vector_row {
1459 let bytes: Vec<u8> = vrow
1460 .try_get("vector")
1461 .map_err(|e| StoreError::Storage(e.to_string()))?;
1462 Some(blob_to_vector(&bytes)?)
1463 } else {
1464 None
1465 }
1466 } else {
1467 None
1468 };
1469
1470 let score = query_embedding.as_ref().and_then(|q_emb| {
1472 embedding
1473 .as_ref()
1474 .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
1475 });
1476
1477 items.push(SearchItem {
1478 item: Item {
1479 namespace,
1480 key,
1481 value,
1482 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
1483 .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1484 .with_timezone(&chrono::Utc),
1485 updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
1486 .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1487 .with_timezone(&chrono::Utc),
1488 expires_at: None,
1489 embedding,
1490 },
1491 score,
1492 });
1493 }
1494
1495 let total_count = items.len();
1496
1497 if query_embedding.is_some() {
1499 items.sort_by(|a, b| {
1500 b.score
1501 .partial_cmp(&a.score)
1502 .unwrap_or(std::cmp::Ordering::Equal)
1503 });
1504 }
1505
1506 let start = query.offset.min(items.len());
1508 let end = (start + query.limit).min(items.len());
1509 let page = items.drain(start..end).collect();
1510
1511 Ok(SearchResult {
1512 items: page,
1513 total_count,
1514 })
1515 }
1516
1517 async fn list_namespaces(
1518 &self,
1519 prefix: Option<&str>,
1520 suffix: Option<&str>,
1521 max_depth: Option<usize>,
1522 limit: Option<usize>,
1523 offset: Option<usize>,
1524 ) -> Result<Vec<String>, StoreError> {
1525 let pool = self
1526 .pool
1527 .as_ref()
1528 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1529
1530 let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
1531 let mut params = Vec::new();
1532
1533 if let Some(prefix_filter) = prefix {
1534 query_str.push_str(" AND namespace LIKE ?");
1535 params.push(format!("{prefix_filter}%"));
1536 }
1537 if let Some(suffix_filter) = suffix {
1538 query_str.push_str(" AND namespace LIKE ?");
1539 params.push(format!("%{suffix_filter}"));
1540 }
1541 if let Some(limit_value) = limit {
1542 let _ = write!(query_str, " LIMIT {limit_value}");
1543 }
1544 if let Some(offset_value) = offset {
1545 let _ = write!(query_str, " OFFSET {offset_value}");
1546 }
1547
1548 let mut query = sqlx::query(&query_str);
1549 for param in params {
1550 query = query.bind(param);
1551 }
1552
1553 let rows = query
1554 .fetch_all(pool)
1555 .await
1556 .map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
1557
1558 let mut namespaces = Vec::new();
1559 for row in rows {
1560 let ns: String = row
1561 .try_get("namespace")
1562 .map_err(|e| StoreError::Storage(e.to_string()))?;
1563 namespaces.push(ns);
1564 }
1565
1566 if let Some(depth) = max_depth {
1568 namespaces = namespaces
1569 .into_iter()
1570 .map(|ns| {
1571 let parts: Vec<&str> = ns.split('/').take(depth).collect();
1572 parts.join("/")
1573 })
1574 .collect();
1575 namespaces.sort();
1576 namespaces.dedup();
1577 }
1578
1579 Ok(namespaces)
1580 }
1581
1582 async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
1583 let mut results = Vec::with_capacity(ops.len());
1584
1585 for op in ops {
1586 let result = match op {
1587 StoreOp::Get { namespace, key } => {
1588 let item = self.get(&namespace, &key).await?;
1589 StoreResult::Item(item)
1590 }
1591 StoreOp::Put {
1592 namespace,
1593 key,
1594 value,
1595 index,
1596 } => {
1597 self.put(&namespace, &key, value, index).await?;
1598 StoreResult::None
1599 }
1600 StoreOp::Delete { namespace, key } => {
1601 self.delete(&namespace, &key).await?;
1602 StoreResult::None
1603 }
1604 StoreOp::Search(query) => {
1605 let result = self.search(query).await?;
1606 StoreResult::Items(result)
1607 }
1608 StoreOp::ListNamespaces {
1609 prefix,
1610 suffix,
1611 max_depth,
1612 limit,
1613 } => {
1614 let namespaces = self
1615 .list_namespaces(
1616 prefix.as_deref(),
1617 suffix.as_deref(),
1618 max_depth,
1619 limit,
1620 None,
1621 )
1622 .await?;
1623 StoreResult::Namespaces(namespaces)
1624 }
1625 };
1626 results.push(result);
1627 }
1628
1629 Ok(results)
1630 }
1631}
1632
1633#[cfg(feature = "postgres")]
1638#[derive(Debug)]
1639pub struct PostgresStore {
1640 pool: Option<sqlx::PgPool>,
1642 index_config: Option<IndexConfig>,
1644}
1645
1646#[cfg(feature = "postgres")]
1647impl PostgresStore {
1648 pub async fn new(database_url: &str) -> Result<Self, StoreError> {
1658 let pool = sqlx::PgPool::connect(database_url)
1659 .await
1660 .map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
1661
1662 sqlx::query(
1664 r"
1665 CREATE TABLE IF NOT EXISTS store_items (
1666 namespace TEXT NOT NULL,
1667 key TEXT NOT NULL,
1668 value JSONB NOT NULL,
1669 created_at TIMESTAMPTZ NOT NULL,
1670 updated_at TIMESTAMPTZ NOT NULL,
1671 PRIMARY KEY (namespace, key)
1672 )
1673 ",
1674 )
1675 .execute(&pool)
1676 .await
1677 .map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
1678
1679 sqlx::query(
1682 r"
1683 CREATE TABLE IF NOT EXISTS store_vectors (
1684 namespace TEXT NOT NULL,
1685 key TEXT NOT NULL,
1686 field TEXT NOT NULL,
1687 vector BYTEA NOT NULL,
1688 PRIMARY KEY (namespace, key, field),
1689 FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
1690 )
1691 ",
1692 )
1693 .execute(&pool)
1694 .await
1695 .map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
1696
1697 Ok(Self {
1698 pool: Some(pool),
1699 index_config: None,
1700 })
1701 }
1702
1703 pub async fn with_vector_search(
1714 database_url: &str,
1715 config: IndexConfig,
1716 ) -> Result<Self, StoreError> {
1717 let mut store = Self::new(database_url).await?;
1718 store.index_config = Some(config);
1719 Ok(store)
1720 }
1721}
1722
1723#[cfg(feature = "postgres")]
1727fn vector_to_bytea(vec: &[f32]) -> Vec<u8> {
1728 let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
1729 for &val in vec {
1730 bytes.extend_from_slice(&val.to_le_bytes());
1731 }
1732 bytes
1733}
1734
1735#[cfg(feature = "postgres")]
1739fn bytea_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
1740 if bytes.len() % std::mem::size_of::<f32>() != 0 {
1741 return Err(StoreError::VectorSearch(
1742 "Invalid BYTEA length for vector data".to_string(),
1743 ));
1744 }
1745 let vec = bytes
1746 .chunks_exact(std::mem::size_of::<f32>())
1747 .map(|chunk| {
1748 let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
1749 f32::from_le_bytes(arr)
1750 })
1751 .collect();
1752 Ok(vec)
1753}
1754
1755#[cfg(feature = "sqlite")]
1759fn vector_to_blob(vec: &[f32]) -> Vec<u8> {
1760 let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
1761 for &val in vec {
1762 bytes.extend_from_slice(&val.to_le_bytes());
1763 }
1764 bytes
1765}
1766
1767#[cfg(feature = "sqlite")]
1771fn blob_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
1772 if bytes.len() % std::mem::size_of::<f32>() != 0 {
1773 return Err(StoreError::VectorSearch(
1774 "Invalid BLOB length for vector data".to_string(),
1775 ));
1776 }
1777 let vec = bytes
1778 .chunks_exact(std::mem::size_of::<f32>())
1779 .map(|chunk| {
1780 let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
1781 f32::from_le_bytes(arr)
1782 })
1783 .collect();
1784 Ok(vec)
1785}
1786
1787#[cfg(feature = "postgres")]
1793fn filter_to_sql_postgres(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
1794 match filter {
1795 FilterExpr::Eq { field, value } => {
1796 let path = field.split('.').collect::<Vec<_>>().join(",");
1797 (format!("value #>> '{{{path}}}' = ?"), vec![value.clone()])
1798 }
1799 FilterExpr::Ne { field, value } => {
1800 let path = field.split('.').collect::<Vec<_>>().join(",");
1801 (format!("value #>> '{{{path}}}' != ?"), vec![value.clone()])
1802 }
1803 FilterExpr::Gt { field, value } => {
1804 let path = field.split('.').collect::<Vec<_>>().join(",");
1805 (
1806 format!("(value #> '{{{path}}}')::numeric > CAST(? AS numeric)"),
1807 vec![value.clone()],
1808 )
1809 }
1810 FilterExpr::Gte { field, value } => {
1811 let path = field.split('.').collect::<Vec<_>>().join(",");
1812 (
1813 format!("(value #> '{{{path}}}')::numeric >= CAST(? AS numeric)"),
1814 vec![value.clone()],
1815 )
1816 }
1817 FilterExpr::Lt { field, value } => {
1818 let path = field.split('.').collect::<Vec<_>>().join(",");
1819 (
1820 format!("(value #> '{{{path}}}')::numeric < CAST(? AS numeric)"),
1821 vec![value.clone()],
1822 )
1823 }
1824 FilterExpr::Lte { field, value } => {
1825 let path = field.split('.').collect::<Vec<_>>().join(",");
1826 (
1827 format!("(value #> '{{{path}}}')::numeric <= CAST(? AS numeric)"),
1828 vec![value.clone()],
1829 )
1830 }
1831 FilterExpr::And { expressions } => {
1832 let mut clauses = Vec::with_capacity(expressions.len());
1833 let mut all_params = Vec::new();
1834 for expr in expressions {
1835 let (clause, params) = filter_to_sql_postgres(expr);
1836 clauses.push(format!("({clause})"));
1837 all_params.extend(params);
1838 }
1839 (clauses.join(" AND "), all_params)
1840 }
1841 FilterExpr::Or { expressions } => {
1842 let mut clauses = Vec::with_capacity(expressions.len());
1843 let mut all_params = Vec::new();
1844 for expr in expressions {
1845 let (clause, params) = filter_to_sql_postgres(expr);
1846 clauses.push(format!("({clause})"));
1847 all_params.extend(params);
1848 }
1849 (clauses.join(" OR "), all_params)
1850 }
1851 FilterExpr::Not { expr } => {
1852 let (clause, params) = filter_to_sql_postgres(expr);
1853 (format!("NOT ({clause})"), params)
1854 }
1855 }
1856}
1857
1858#[cfg(feature = "postgres")]
1859#[async_trait]
1860impl Store for PostgresStore {
1861 async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
1862 let pool = self
1863 .pool
1864 .as_ref()
1865 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1866
1867 let result = sqlx::query(
1868 "SELECT value, created_at, updated_at FROM store_items WHERE namespace = $1 AND key = $2"
1869 )
1870 .bind(namespace)
1871 .bind(key)
1872 .fetch_optional(pool)
1873 .await
1874 .map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
1875
1876 if let Some(row) = result {
1877 let value: serde_json::Value = row
1878 .try_get("value")
1879 .map_err(|e| StoreError::Storage(e.to_string()))?;
1880 let created_at: chrono::DateTime<chrono::Utc> = row
1881 .try_get("created_at")
1882 .map_err(|e| StoreError::Storage(e.to_string()))?;
1883 let updated_at: chrono::DateTime<chrono::Utc> = row
1884 .try_get("updated_at")
1885 .map_err(|e| StoreError::Storage(e.to_string()))?;
1886
1887 let embedding = if self.index_config.is_some() {
1889 let vector_row = sqlx::query(
1890 "SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
1891 )
1892 .bind(namespace)
1893 .bind(key)
1894 .fetch_optional(pool)
1895 .await
1896 .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1897
1898 if let Some(vrow) = vector_row {
1899 let bytes: Vec<u8> = vrow
1900 .try_get("vector")
1901 .map_err(|e| StoreError::Storage(e.to_string()))?;
1902 Some(bytea_to_vector(&bytes)?)
1903 } else {
1904 None
1905 }
1906 } else {
1907 None
1908 };
1909
1910 Ok(Some(Item {
1911 namespace: namespace.to_string(),
1912 key: key.to_string(),
1913 value,
1914 created_at,
1915 updated_at,
1916 expires_at: None,
1917 embedding,
1918 }))
1919 } else {
1920 Ok(None)
1921 }
1922 }
1923
1924 async fn put(
1925 &self,
1926 namespace: &str,
1927 key: &str,
1928 value: serde_json::Value,
1929 index: Option<Vec<String>>,
1930 ) -> Result<(), StoreError> {
1931 let pool = self
1932 .pool
1933 .as_ref()
1934 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1935
1936 let now = Utc::now();
1937
1938 let embedding = if let Some(ref index_config) = self.index_config {
1940 if let Some(index_fields) = &index {
1941 if index_fields.is_empty() {
1942 None
1943 } else {
1944 let text = extract_index_text(&value, index_fields);
1945 if text.is_empty() {
1946 None
1947 } else {
1948 let mut embeddings = index_config.embed.embed(vec![text]).await?;
1949 embeddings.pop()
1950 }
1951 }
1952 } else {
1953 None
1954 }
1955 } else {
1956 None
1957 };
1958
1959 sqlx::query(
1960 r"
1961 INSERT INTO store_items (namespace, key, value, created_at, updated_at)
1962 VALUES ($1, $2, $3, $4, $5)
1963 ON CONFLICT (namespace, key) DO UPDATE SET
1964 value = EXCLUDED.value,
1965 updated_at = EXCLUDED.updated_at
1966 ",
1967 )
1968 .bind(namespace)
1969 .bind(key)
1970 .bind(&value)
1971 .bind(now)
1972 .bind(now)
1973 .execute(pool)
1974 .await
1975 .map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
1976
1977 if let Some(vec) = embedding {
1979 let bytes = vector_to_bytea(&vec);
1980 sqlx::query(
1981 r"
1982 INSERT INTO store_vectors (namespace, key, field, vector)
1983 VALUES ($1, $2, 'default', $3)
1984 ON CONFLICT (namespace, key, field) DO UPDATE SET
1985 vector = EXCLUDED.vector
1986 ",
1987 )
1988 .bind(namespace)
1989 .bind(key)
1990 .bind(&bytes)
1991 .execute(pool)
1992 .await
1993 .map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
1994 }
1995
1996 Ok(())
1997 }
1998
1999 async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
2000 let pool = self
2001 .pool
2002 .as_ref()
2003 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2004
2005 sqlx::query("DELETE FROM store_items WHERE namespace = $1 AND key = $2")
2006 .bind(namespace)
2007 .bind(key)
2008 .execute(pool)
2009 .await
2010 .map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
2011
2012 Ok(())
2013 }
2014
2015 #[allow(
2016 clippy::cast_possible_truncation,
2017 clippy::cast_possible_wrap,
2018 clippy::cast_sign_loss,
2019 clippy::as_conversions,
2020 clippy::similar_names,
2021 clippy::too_many_lines,
2022 reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
2023 )]
2024 async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
2025 let pool = self
2026 .pool
2027 .as_ref()
2028 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2029
2030 let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
2032 if let Some(query_text) = &query.query {
2033 if query_text.is_empty() {
2034 None
2035 } else {
2036 let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
2037 embeddings.pop()
2038 }
2039 } else {
2040 None
2041 }
2042 } else {
2043 None
2044 };
2045
2046 let namespace_pattern = format!("{}%", query.namespace_prefix);
2047 let mut conditions = vec!["namespace LIKE $1".to_string()];
2048 let mut bind_params: Vec<String> = vec![namespace_pattern];
2049 let mut param_idx = 2;
2050
2051 if let Some(ref filter) = query.filter {
2052 let (clause, filter_params) = filter_to_sql_postgres(filter);
2053 let mut numbered_clause = String::with_capacity(clause.len());
2055 for c in clause.chars() {
2056 if c == '?' {
2057 let _ = write!(numbered_clause, "${param_idx}");
2058 param_idx += 1;
2059 } else {
2060 numbered_clause.push(c);
2061 }
2062 }
2063 conditions.push(format!("({numbered_clause})"));
2064 for p in &filter_params {
2065 bind_params.push(p.to_string());
2066 }
2067 }
2068
2069 let where_clause = conditions.join(" AND ");
2070
2071 let data_sql = format!(
2073 "SELECT namespace, key, value, created_at, updated_at \
2074 FROM store_items WHERE {where_clause} \
2075 ORDER BY namespace, key"
2076 );
2077 let mut data_query = sqlx::query(&data_sql);
2078 for p in &bind_params {
2079 data_query = data_query.bind(p.as_str());
2080 }
2081
2082 let rows = data_query
2083 .fetch_all(pool)
2084 .await
2085 .map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
2086
2087 let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
2088
2089 for row in rows {
2091 let namespace: String = row
2092 .try_get("namespace")
2093 .map_err(|e| StoreError::Storage(e.to_string()))?;
2094 let key: String = row
2095 .try_get("key")
2096 .map_err(|e| StoreError::Storage(e.to_string()))?;
2097 let value: serde_json::Value = row
2098 .try_get("value")
2099 .map_err(|e| StoreError::Storage(e.to_string()))?;
2100 let created_at: chrono::DateTime<chrono::Utc> = row
2101 .try_get("created_at")
2102 .map_err(|e| StoreError::Storage(e.to_string()))?;
2103 let updated_at: chrono::DateTime<chrono::Utc> = row
2104 .try_get("updated_at")
2105 .map_err(|e| StoreError::Storage(e.to_string()))?;
2106
2107 let embedding = if query_embedding.is_some() {
2109 let vector_row = sqlx::query(
2110 "SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
2111 )
2112 .bind(&namespace)
2113 .bind(&key)
2114 .fetch_optional(pool)
2115 .await
2116 .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
2117
2118 if let Some(vrow) = vector_row {
2119 let bytes: Vec<u8> = vrow
2120 .try_get("vector")
2121 .map_err(|e| StoreError::Storage(e.to_string()))?;
2122 Some(bytea_to_vector(&bytes)?)
2123 } else {
2124 None
2125 }
2126 } else {
2127 None
2128 };
2129
2130 let score = query_embedding.as_ref().and_then(|q_emb| {
2132 embedding
2133 .as_ref()
2134 .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
2135 });
2136
2137 items.push(SearchItem {
2138 item: Item {
2139 namespace,
2140 key,
2141 value,
2142 created_at,
2143 updated_at,
2144 expires_at: None,
2145 embedding,
2146 },
2147 score,
2148 });
2149 }
2150
2151 let total_count = items.len();
2152
2153 if query_embedding.is_some() {
2155 items.sort_by(|a, b| {
2156 b.score
2157 .partial_cmp(&a.score)
2158 .unwrap_or(std::cmp::Ordering::Equal)
2159 });
2160 }
2161
2162 let start = query.offset.min(items.len());
2164 let end = (start + query.limit).min(items.len());
2165 let page = items.drain(start..end).collect();
2166
2167 Ok(SearchResult {
2168 items: page,
2169 total_count,
2170 })
2171 }
2172
2173 async fn list_namespaces(
2174 &self,
2175 prefix: Option<&str>,
2176 suffix: Option<&str>,
2177 max_depth: Option<usize>,
2178 limit: Option<usize>,
2179 offset: Option<usize>,
2180 ) -> Result<Vec<String>, StoreError> {
2181 let pool = self
2182 .pool
2183 .as_ref()
2184 .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2185
2186 let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
2187 let mut param_idx = 1;
2188 let mut params = Vec::new();
2189
2190 if let Some(prefix_filter) = prefix {
2191 param_idx += 1;
2192 let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
2193 params.push(format!("{prefix_filter}%"));
2194 }
2195 if let Some(suffix_filter) = suffix {
2196 param_idx += 1;
2197 let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
2198 params.push(format!("%{suffix_filter}"));
2199 }
2200 if let Some(limit_value) = limit {
2201 let _ = write!(query_str, " LIMIT {limit_value}");
2202 }
2203 if let Some(offset_value) = offset {
2204 let _ = write!(query_str, " OFFSET {offset_value}");
2205 }
2206
2207 let mut query = sqlx::query(&query_str);
2208 for param in params {
2209 query = query.bind(param);
2210 }
2211
2212 let rows = query
2213 .fetch_all(pool)
2214 .await
2215 .map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
2216
2217 let mut namespaces = Vec::new();
2218 for row in rows {
2219 let ns: String = row
2220 .try_get("namespace")
2221 .map_err(|e| StoreError::Storage(e.to_string()))?;
2222 namespaces.push(ns);
2223 }
2224
2225 if let Some(depth) = max_depth {
2227 namespaces = namespaces
2228 .into_iter()
2229 .map(|ns| {
2230 let parts: Vec<&str> = ns.split('/').take(depth).collect();
2231 parts.join("/")
2232 })
2233 .collect();
2234 namespaces.sort();
2235 namespaces.dedup();
2236 }
2237
2238 Ok(namespaces)
2239 }
2240
2241 async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
2242 let mut results = Vec::with_capacity(ops.len());
2243
2244 for op in ops {
2245 let result = match op {
2246 StoreOp::Get { namespace, key } => {
2247 let item = self.get(&namespace, &key).await?;
2248 StoreResult::Item(item)
2249 }
2250 StoreOp::Put {
2251 namespace,
2252 key,
2253 value,
2254 index,
2255 } => {
2256 self.put(&namespace, &key, value, index).await?;
2257 StoreResult::None
2258 }
2259 StoreOp::Delete { namespace, key } => {
2260 self.delete(&namespace, &key).await?;
2261 StoreResult::None
2262 }
2263 StoreOp::Search(query) => {
2264 let result = self.search(query).await?;
2265 StoreResult::Items(result)
2266 }
2267 StoreOp::ListNamespaces {
2268 prefix,
2269 suffix,
2270 max_depth,
2271 limit,
2272 } => {
2273 let namespaces = self
2274 .list_namespaces(
2275 prefix.as_deref(),
2276 suffix.as_deref(),
2277 max_depth,
2278 limit,
2279 None,
2280 )
2281 .await?;
2282 StoreResult::Namespaces(namespaces)
2283 }
2284 };
2285 results.push(result);
2286 }
2287
2288 Ok(results)
2289 }
2290}
2291
2292#[cfg(test)]
2295mod tests {
2296 use super::*;
2297 use serde_json::json;
2298
2299 fn active_value() -> serde_json::Value {
2300 json!({ "status": "active" })
2301 }
2302
2303 fn inactive_value() -> serde_json::Value {
2304 json!({ "status": "inactive" })
2305 }
2306
2307 #[test]
2308 fn test_filter_not_negates_match() {
2309 let filter = FilterExpr::Not {
2311 expr: Box::new(FilterExpr::Eq {
2312 field: "status".to_string(),
2313 value: json!("active"),
2314 }),
2315 };
2316 assert!(evaluate_filter(&filter, &inactive_value()));
2317 }
2318
2319 #[test]
2320 fn test_filter_not_inverts_true_to_false() {
2321 let filter = FilterExpr::Not {
2323 expr: Box::new(FilterExpr::Eq {
2324 field: "status".to_string(),
2325 value: json!("active"),
2326 }),
2327 };
2328 assert!(!evaluate_filter(&filter, &active_value()));
2329 }
2330
2331 #[test]
2332 fn test_filter_not_combined_with_and() {
2333 let value = json!({ "age": 25, "status": "active" });
2335 let filter = FilterExpr::And {
2336 expressions: vec![
2337 FilterExpr::Gte {
2338 field: "age".to_string(),
2339 value: json!(18),
2340 },
2341 FilterExpr::Not {
2342 expr: Box::new(FilterExpr::Eq {
2343 field: "status".to_string(),
2344 value: json!("banned"),
2345 }),
2346 },
2347 ],
2348 };
2349 assert!(evaluate_filter(&filter, &value));
2350
2351 let banned_value = json!({ "age": 25, "status": "banned" });
2353 assert!(!evaluate_filter(&filter, &banned_value));
2354
2355 let young_value = json!({ "age": 17, "status": "active" });
2357 assert!(!evaluate_filter(&filter, &young_value));
2358 }
2359
2360 #[test]
2361 fn test_filter_not_serialization_roundtrip() {
2362 let filter = FilterExpr::Not {
2363 expr: Box::new(FilterExpr::Eq {
2364 field: "status".to_string(),
2365 value: json!("active"),
2366 }),
2367 };
2368
2369 let serialized = serde_json::to_string(&filter).expect("serialization failed");
2370 assert!(
2371 serialized.contains("\"$not\""),
2372 "serialized form must contain $not tag"
2373 );
2374
2375 let deserialized: FilterExpr =
2376 serde_json::from_str(&serialized).expect("deserialization failed");
2377
2378 let value = active_value();
2380 assert_eq!(
2381 evaluate_filter(&filter, &value),
2382 evaluate_filter(&deserialized, &value),
2383 "roundtrip filter must produce the same result"
2384 );
2385 }
2386
2387 #[test]
2388 fn test_filter_nested_not() {
2389 let filter = FilterExpr::Not {
2391 expr: Box::new(FilterExpr::Not {
2392 expr: Box::new(FilterExpr::Eq {
2393 field: "status".to_string(),
2394 value: json!("active"),
2395 }),
2396 }),
2397 };
2398 assert!(evaluate_filter(&filter, &active_value()));
2399 assert!(!evaluate_filter(&filter, &inactive_value()));
2400 }
2401
2402 #[tokio::test]
2405 async fn test_ttl_expiration_on_get() {
2406 let store = MemoryStore::new().with_ttl_config(TTLConfig {
2407 default_ttl: Some(std::time::Duration::from_millis(50)),
2408 refresh_on_read: false,
2409 ..Default::default()
2410 });
2411
2412 store
2413 .put("ns", "key1", json!({"v": 1}), None)
2414 .await
2415 .expect("put failed");
2416
2417 let item = store
2419 .get("ns", "key1")
2420 .await
2421 .expect("get failed")
2422 .expect("item should exist");
2423 assert_eq!(item.key, "key1");
2424
2425 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
2427
2428 let result = store.get("ns", "key1").await.expect("get failed");
2430 assert!(result.is_none(), "item should have expired");
2431 }
2432
2433 #[tokio::test]
2434 async fn test_ttl_refresh_on_read() {
2435 let store = MemoryStore::new().with_ttl_config(TTLConfig {
2436 default_ttl: Some(std::time::Duration::from_millis(100)),
2437 refresh_on_read: true,
2438 ..Default::default()
2439 });
2440
2441 store
2442 .put("ns", "key1", json!({"v": 1}), None)
2443 .await
2444 .expect("put failed");
2445
2446 for _ in 0..3 {
2448 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2449 let item = store
2450 .get("ns", "key1")
2451 .await
2452 .expect("get failed")
2453 .expect("item should still exist after refresh");
2454 assert_eq!(item.key, "key1");
2455 }
2456
2457 let result = store.get("ns", "key1").await.expect("get failed");
2460 assert!(
2461 result.is_some(),
2462 "item should still exist after TTL refreshes"
2463 );
2464
2465 tokio::time::sleep(std::time::Duration::from_millis(120)).await;
2467 let result = store.get("ns", "key1").await.expect("get failed");
2468 assert!(result.is_none(), "item should have expired after no reads");
2469 }
2470
2471 #[tokio::test]
2472 async fn test_ttl_search_filters_expired() {
2473 let store = MemoryStore::new().with_ttl_config(TTLConfig {
2474 default_ttl: Some(std::time::Duration::from_millis(50)),
2475 refresh_on_read: false,
2476 ..Default::default()
2477 });
2478
2479 store
2480 .put("ns", "key1", json!({"v": 1}), None)
2481 .await
2482 .expect("put failed");
2483 store
2484 .put("ns", "key2", json!({"v": 2}), None)
2485 .await
2486 .expect("put failed");
2487
2488 let query = SearchQuery {
2490 namespace_prefix: "ns".to_string(),
2491 filter: None,
2492 query: None,
2493 limit: 10,
2494 offset: 0,
2495 };
2496 let result = store.search(query).await.expect("search failed");
2497 assert_eq!(result.total_count, 2);
2498
2499 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
2501
2502 let query = SearchQuery {
2504 namespace_prefix: "ns".to_string(),
2505 filter: None,
2506 query: None,
2507 limit: 10,
2508 offset: 0,
2509 };
2510 let result = store.search(query).await.expect("search failed");
2511 assert_eq!(
2512 result.total_count, 0,
2513 "expired items should be filtered from search"
2514 );
2515 }
2516
2517 #[tokio::test]
2518 async fn test_no_ttl_items_never_expire() {
2519 let store = MemoryStore::new();
2520
2521 store
2522 .put("ns", "key1", json!({"v": 1}), None)
2523 .await
2524 .expect("put failed");
2525
2526 let has_no_expiry = {
2528 let data = store.data.read().await;
2529 data.get("ns")
2530 .and_then(|ns| ns.get("key1"))
2531 .is_some_and(|item| item.expires_at.is_none())
2532 };
2533 assert!(has_no_expiry, "item should have no expiration set");
2534
2535 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2537 let result = store.get("ns", "key1").await.expect("get failed");
2538 assert!(result.is_some(), "item without TTL should never expire");
2539 }
2540
2541 #[tokio::test]
2542 async fn test_ttl_lazy_cleanup_removes_from_underlying_storage() {
2543 let store = MemoryStore::new().with_ttl_config(TTLConfig {
2544 default_ttl: Some(std::time::Duration::from_millis(30)),
2545 refresh_on_read: false,
2546 ..Default::default()
2547 });
2548
2549 store
2550 .put("ns", "key1", json!({"v": 1}), None)
2551 .await
2552 .expect("put failed");
2553
2554 let exists_before = {
2556 let data = store.data.read().await;
2557 data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
2558 };
2559 assert!(exists_before, "item should exist in storage initially");
2560
2561 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2563
2564 let _ = store.get("ns", "key1").await;
2566
2567 let exists_after = {
2569 let data = store.data.read().await;
2570 data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
2571 };
2572 assert!(!exists_after, "expired item should be removed from storage");
2573 }
2574
2575 #[tokio::test]
2576 async fn test_ttl_refresh_updates_expires_at() {
2577 let store = MemoryStore::new().with_ttl_config(TTLConfig {
2578 default_ttl: Some(std::time::Duration::from_millis(200)),
2579 refresh_on_read: true,
2580 ..Default::default()
2581 });
2582
2583 store
2584 .put("ns", "key1", json!({"v": 1}), None)
2585 .await
2586 .expect("put failed");
2587
2588 let original_expires = {
2589 let data = store.data.read().await;
2590 data.get("ns")
2591 .and_then(|ns| ns.get("key1"))
2592 .expect("item")
2593 .expires_at
2594 .expect("should have expires_at")
2595 };
2596
2597 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2599
2600 let _ = store.get("ns", "key1").await;
2601
2602 let refreshed_expires = {
2603 let data = store.data.read().await;
2604 data.get("ns")
2605 .and_then(|ns| ns.get("key1"))
2606 .expect("item")
2607 .expires_at
2608 .expect("should have expires_at")
2609 };
2610
2611 assert!(
2612 refreshed_expires > original_expires,
2613 "refresh_on_read should advance the expiration time: {refreshed_expires} should be > {original_expires}"
2614 );
2615 }
2616
2617 struct TestEmbeddingFunc;
2625
2626 #[async_trait::async_trait]
2627 impl EmbeddingFunc for TestEmbeddingFunc {
2628 async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError> {
2629 Ok(texts
2630 .iter()
2631 .map(|text| {
2632 let hash: u64 = text.bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
2634 (h ^ u64::from(b)).wrapping_mul(0x0100_0000_01b3)
2635 });
2636 let mut vec: Vec<f32> = (0..8)
2637 .map(|i| f32::from(((hash >> (i * 8)) & 0xFF) as u8) / 255.0)
2638 .collect();
2639 let norm = vec.iter().map(|x| x * x).sum::<f32>().sqrt();
2640 if norm > 0.0 {
2641 for v in &mut vec {
2642 *v /= norm;
2643 }
2644 }
2645 vec
2646 })
2647 .collect())
2648 }
2649 }
2650
2651 #[test]
2652 fn test_cosine_similarity_identical_vectors() {
2653 let v = vec![1.0, 0.0, 0.0];
2654 let sim = cosine_similarity(&v, &v);
2655 let expected = 1.0;
2656 assert!(
2657 (sim - expected).abs() < f32::EPSILON,
2658 "identical vectors should have similarity 1.0, got {sim}"
2659 );
2660 }
2661
2662 #[test]
2663 fn test_cosine_similarity_orthogonal_vectors() {
2664 let a = vec![1.0, 0.0, 0.0];
2665 let b = vec![0.0, 1.0, 0.0];
2666 let sim = cosine_similarity(&a, &b);
2667 let expected = 0.0;
2668 assert!(
2669 (sim - expected).abs() < f32::EPSILON,
2670 "orthogonal vectors should have similarity 0.0, got {sim}"
2671 );
2672 }
2673
2674 #[test]
2675 fn test_cosine_similarity_opposite_vectors() {
2676 let a = vec![1.0, 0.0];
2677 let b = vec![-1.0, 0.0];
2678 let sim = cosine_similarity(&a, &b);
2679 let expected = -1.0;
2680 assert!(
2681 (sim - expected).abs() < f32::EPSILON,
2682 "opposite vectors should have similarity -1.0, got {sim}"
2683 );
2684 }
2685
2686 #[test]
2687 fn test_cosine_similarity_zero_norm() {
2688 let a = vec![0.0, 0.0, 0.0];
2689 let b = vec![1.0, 0.0, 0.0];
2690 let sim = cosine_similarity(&a, &b);
2691 let expected = 0.0;
2692 assert!(
2693 (sim - expected).abs() < f32::EPSILON,
2694 "zero-norm vector should give similarity 0.0, got {sim}"
2695 );
2696 }
2697
2698 #[tokio::test]
2699 async fn test_search_with_embeddings_returns_scored_results() {
2700 let index_config = IndexConfig {
2701 dims: 8,
2702 embed: Box::new(TestEmbeddingFunc),
2703 fields: Some(vec!["text".to_string()]),
2704 };
2705 let store = MemoryStore::new().with_vector_search(index_config);
2706
2707 store
2709 .put(
2710 "docs",
2711 "item1",
2712 json!({"text": "hello world"}),
2713 Some(vec!["text".to_string()]),
2714 )
2715 .await
2716 .expect("put failed");
2717 store
2718 .put(
2719 "docs",
2720 "item2",
2721 json!({"text": "quantum physics"}),
2722 Some(vec!["text".to_string()]),
2723 )
2724 .await
2725 .expect("put failed");
2726
2727 let query = SearchQuery {
2729 namespace_prefix: "docs".to_string(),
2730 filter: None,
2731 query: Some("hello world".to_string()),
2732 limit: 10,
2733 offset: 0,
2734 };
2735 let result = store.search(query).await.expect("search failed");
2736
2737 assert!(
2738 !result.items.is_empty(),
2739 "search should return matching items"
2740 );
2741 for item in &result.items {
2743 assert!(
2744 item.score.is_some(),
2745 "items with embeddings should have similarity scores"
2746 );
2747 }
2748
2749 if let Some(score) = result.items.first().and_then(|i| i.score) {
2751 assert!(
2752 score > 0.9,
2753 "top result should have high similarity score, got {score}"
2754 );
2755 }
2756 }
2757
2758 #[tokio::test]
2759 async fn test_search_ordering_respects_similarity() {
2760 let index_config = IndexConfig {
2761 dims: 8,
2762 embed: Box::new(TestEmbeddingFunc),
2763 fields: Some(vec!["text".to_string()]),
2764 };
2765 let store = MemoryStore::new().with_vector_search(index_config);
2766
2767 store
2769 .put(
2770 "docs",
2771 "hello-world",
2772 json!({"text": "hello world"}),
2773 Some(vec!["text".to_string()]),
2774 )
2775 .await
2776 .expect("put failed");
2777 store
2778 .put(
2779 "docs",
2780 "hello-there",
2781 json!({"text": "hello there"}),
2782 Some(vec!["text".to_string()]),
2783 )
2784 .await
2785 .expect("put failed");
2786 store
2787 .put(
2788 "docs",
2789 "quantum-physics",
2790 json!({"text": "quantum physics"}),
2791 Some(vec!["text".to_string()]),
2792 )
2793 .await
2794 .expect("put failed");
2795
2796 let query = SearchQuery {
2798 namespace_prefix: "docs".to_string(),
2799 filter: None,
2800 query: Some("hello world".to_string()),
2801 limit: 10,
2802 offset: 0,
2803 };
2804 let result = store.search(query).await.expect("search failed");
2805
2806 assert_eq!(
2807 result.items.len(),
2808 3,
2809 "should return all 3 items in the namespace"
2810 );
2811
2812 let first = result
2814 .items
2815 .first()
2816 .expect("should have at least one result");
2817 assert_eq!(
2818 first.item.key, "hello-world",
2819 "the most similar item should be ranked first"
2820 );
2821
2822 for pair in result.items.windows(2) {
2824 if let (Some(a), Some(b)) = (pair[0].score, pair[1].score) {
2825 assert!(
2826 a >= b,
2827 "scores should be in descending order: {a} should be >= {b}"
2828 );
2829 }
2830 }
2831 }
2832
2833 #[tokio::test]
2834 async fn test_search_without_index_returns_no_scores() {
2835 let store = MemoryStore::new();
2837
2838 store
2839 .put("docs", "item1", json!({"text": "hello"}), None)
2840 .await
2841 .expect("put failed");
2842 store
2843 .put("docs", "item2", json!({"text": "world"}), None)
2844 .await
2845 .expect("put failed");
2846
2847 let query = SearchQuery {
2849 namespace_prefix: "docs".to_string(),
2850 filter: None,
2851 query: Some("hello".to_string()),
2852 limit: 10,
2853 offset: 0,
2854 };
2855 let result = store.search(query).await.expect("search failed");
2856
2857 assert_eq!(result.items.len(), 2, "should return all items");
2858 for item in &result.items {
2860 assert!(
2861 item.score.is_none(),
2862 "items without index should have no score"
2863 );
2864 }
2865 }
2866
2867 #[tokio::test]
2868 async fn test_list_namespaces_offset_skips_first_n() {
2869 let store = MemoryStore::new();
2870
2871 for i in 0..5 {
2873 store
2874 .put(&format!("ns-{i}"), "key", json!({"v": i}), None)
2875 .await
2876 .expect("put failed");
2877 }
2878
2879 let all_ns = store
2881 .list_namespaces(None, None, None, None, None)
2882 .await
2883 .expect("list_namespaces failed");
2884 assert_eq!(all_ns.len(), 5, "expected all 5 namespaces");
2885
2886 let offset_ns = store
2888 .list_namespaces(None, None, None, None, Some(2))
2889 .await
2890 .expect("list_namespaces with offset failed");
2891 assert_eq!(
2892 offset_ns.len(),
2893 3,
2894 "offset=2 should skip 2 namespaces, leaving 3"
2895 );
2896 }
2897
2898 #[tokio::test]
2899 async fn test_list_namespaces_offset_and_limit_together() {
2900 let store = MemoryStore::new();
2901
2902 for i in 0..10 {
2903 store
2904 .put(&format!("ns-{i:02}"), "key", json!({"v": i}), None)
2905 .await
2906 .expect("put failed");
2907 }
2908
2909 let page = store
2911 .list_namespaces(None, None, None, Some(4), Some(3))
2912 .await
2913 .expect("list_namespaces failed");
2914 assert_eq!(page.len(), 4, "offset=3 + limit=4 should yield 4 results");
2915 }
2916
2917 #[tokio::test]
2918 async fn test_list_namespaces_offset_larger_than_results() {
2919 let store = MemoryStore::new();
2920
2921 store
2922 .put("only-ns", "key", json!({"v": 1}), None)
2923 .await
2924 .expect("put failed");
2925
2926 let result = store
2928 .list_namespaces(None, None, None, None, Some(100))
2929 .await
2930 .expect("list_namespaces failed");
2931 assert!(
2932 result.is_empty(),
2933 "offset larger than result set should return empty"
2934 );
2935 }
2936
2937 #[tokio::test]
2938 async fn test_list_namespaces_offset_with_prefix_filter() {
2939 let store = MemoryStore::new();
2940
2941 for i in 0..6 {
2942 let ns = if i < 3 {
2943 format!("alpha-{i}")
2944 } else {
2945 format!("beta-{i}")
2946 };
2947 store
2948 .put(&ns, "key", json!({"v": i}), None)
2949 .await
2950 .expect("put failed");
2951 }
2952
2953 let result = store
2955 .list_namespaces(Some("alpha-"), None, None, None, Some(1))
2956 .await
2957 .expect("list_namespaces failed");
2958 assert_eq!(
2959 result.len(),
2960 2,
2961 "prefix filter + offset=1 should leave 2 namespaces"
2962 );
2963 assert!(
2964 result.iter().all(|ns| ns.starts_with("alpha-")),
2965 "all results must match prefix filter"
2966 );
2967 }
2968
2969 #[cfg(feature = "sqlite")]
2972 #[test]
2973 fn test_filter_to_sql_eq() {
2974 let filter = FilterExpr::Eq {
2975 field: "status".to_string(),
2976 value: json!("active"),
2977 };
2978 let (sql, params) = filter_to_sql_sqlite(&filter);
2979 assert_eq!(sql, "json_extract(value, '$.status') = ?");
2980 assert_eq!(params.len(), 1);
2981 assert_eq!(sqlite_param_from_value(¶ms[0]), "active");
2982 }
2983
2984 #[cfg(feature = "sqlite")]
2985 #[test]
2986 fn test_filter_to_sql_gt() {
2987 let filter = FilterExpr::Gt {
2988 field: "age".to_string(),
2989 value: json!(18),
2990 };
2991 let (sql, params) = filter_to_sql_sqlite(&filter);
2992 assert_eq!(
2993 sql,
2994 "CAST(json_extract(value, '$.age') AS REAL) > CAST(? AS REAL)"
2995 );
2996 assert_eq!(params.len(), 1);
2997 assert_eq!(sqlite_param_from_value(¶ms[0]), "18");
2998 }
2999
3000 #[cfg(feature = "sqlite")]
3001 #[test]
3002 fn test_filter_to_sql_and_or_combination() {
3003 let filter = FilterExpr::And {
3004 expressions: vec![
3005 FilterExpr::Eq {
3006 field: "status".to_string(),
3007 value: json!("active"),
3008 },
3009 FilterExpr::Or {
3010 expressions: vec![
3011 FilterExpr::Gte {
3012 field: "age".to_string(),
3013 value: json!(18),
3014 },
3015 FilterExpr::Eq {
3016 field: "role".to_string(),
3017 value: json!("admin"),
3018 },
3019 ],
3020 },
3021 ],
3022 };
3023 let (sql, params) = filter_to_sql_sqlite(&filter);
3024 assert_eq!(
3025 sql,
3026 "(json_extract(value, '$.status') = ?) AND \
3027 ((CAST(json_extract(value, '$.age') AS REAL) >= CAST(? AS REAL)) OR \
3028 (json_extract(value, '$.role') = ?))"
3029 );
3030 assert_eq!(params.len(), 3);
3031 }
3032
3033 #[cfg(feature = "sqlite")]
3034 #[test]
3035 fn test_filter_to_sql_not() {
3036 let filter = FilterExpr::Not {
3037 expr: Box::new(FilterExpr::Eq {
3038 field: "status".to_string(),
3039 value: json!("banned"),
3040 }),
3041 };
3042 let (sql, params) = filter_to_sql_sqlite(&filter);
3043 assert!(sql.starts_with("NOT ("));
3044 assert!(sql.contains("json_extract(value, '$.status') = ?"));
3045 assert_eq!(params.len(), 1);
3046 }
3047
3048 #[cfg(feature = "sqlite")]
3049 #[test]
3050 fn test_sqlite_param_bool_true() {
3051 assert_eq!(sqlite_param_from_value(&json!(true)), "1");
3052 }
3053
3054 #[cfg(feature = "sqlite")]
3055 #[test]
3056 fn test_sqlite_param_bool_false() {
3057 assert_eq!(sqlite_param_from_value(&json!(false)), "0");
3058 }
3059
3060 #[cfg(feature = "sqlite")]
3061 #[test]
3062 fn test_sqlite_param_string() {
3063 assert_eq!(sqlite_param_from_value(&json!("hello")), "hello");
3064 }
3065
3066 #[cfg(feature = "sqlite")]
3067 #[test]
3068 fn test_sqlite_param_number() {
3069 assert_eq!(sqlite_param_from_value(&json!(42)), "42");
3070 assert_eq!(sqlite_param_from_value(&json!(42.5)), "42.5");
3071 }
3072
3073 #[cfg(feature = "postgres")]
3076 #[test]
3077 fn test_filter_to_sql_postgres_eq() {
3078 let filter = FilterExpr::Eq {
3079 field: "status".to_string(),
3080 value: json!("active"),
3081 };
3082 let (sql, params) = filter_to_sql_postgres(&filter);
3083 assert_eq!(sql, "value #>> '{status}' = ?");
3084 assert_eq!(params.len(), 1);
3085 }
3086
3087 #[cfg(feature = "postgres")]
3088 #[test]
3089 fn test_filter_to_sql_postgres_nested_field() {
3090 let filter = FilterExpr::Eq {
3091 field: "user.address.city".to_string(),
3092 value: json!("NYC"),
3093 };
3094 let (sql, _params) = filter_to_sql_postgres(&filter);
3095 assert_eq!(sql, "value #>> '{user,address,city}' = ?");
3096 }
3097
3098 #[cfg(feature = "postgres")]
3099 #[test]
3100 fn test_filter_to_sql_postgres_numeric_compare() {
3101 let filter = FilterExpr::Lt {
3102 field: "price".to_string(),
3103 value: json!(100.0),
3104 };
3105 let (sql, _params) = filter_to_sql_postgres(&filter);
3106 assert_eq!(sql, "(value #> '{price}')::numeric < CAST(? AS numeric)");
3107 }
3108
3109 #[cfg(feature = "postgres")]
3110 #[test]
3111 fn test_filter_to_sql_postgres_and_or_not() {
3112 let filter = FilterExpr::And {
3113 expressions: vec![
3114 FilterExpr::Eq {
3115 field: "a".to_string(),
3116 value: json!(1),
3117 },
3118 FilterExpr::Not {
3119 expr: Box::new(FilterExpr::Or {
3120 expressions: vec![
3121 FilterExpr::Eq {
3122 field: "b".to_string(),
3123 value: json!(2),
3124 },
3125 FilterExpr::Eq {
3126 field: "c".to_string(),
3127 value: json!(3),
3128 },
3129 ],
3130 }),
3131 },
3132 ],
3133 };
3134 let (sql, params) = filter_to_sql_postgres(&filter);
3135 assert_eq!(params.len(), 3);
3136 assert!(sql.contains("AND"));
3137 assert!(sql.contains("NOT ("));
3138 assert!(sql.contains("OR"));
3139 }
3140
3141 #[tokio::test]
3144 async fn test_sweep_expired_items_removes_expired() {
3145 let store = MemoryStore::new().with_ttl_config(TTLConfig {
3146 default_ttl: Some(std::time::Duration::from_millis(50)),
3147 refresh_on_read: false,
3148 ..Default::default()
3149 });
3150
3151 store
3152 .put("ns", "key1", json!({"v": 1}), None)
3153 .await
3154 .expect("put failed");
3155 store
3156 .put("ns", "key2", json!({"v": 2}), None)
3157 .await
3158 .expect("put failed");
3159
3160 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3162
3163 let count = store.sweep_expired_items().await.expect("sweep failed");
3165 assert_eq!(count, 2, "sweep should remove 2 expired items");
3166
3167 let exists = {
3169 let data = store.data.read().await;
3170 data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3171 };
3172 assert!(!exists, "expired item should be removed from storage");
3173
3174 let exists = {
3175 let data = store.data.read().await;
3176 data.get("ns").is_some_and(|ns| ns.contains_key("key2"))
3177 };
3178 assert!(!exists, "expired item should be removed from storage");
3179 }
3180
3181 #[tokio::test]
3182 async fn test_sweep_expired_items_respects_max_items_limit() {
3183 let store = MemoryStore::new().with_ttl_config(TTLConfig {
3184 default_ttl: Some(std::time::Duration::from_millis(50)),
3185 refresh_on_read: false,
3186 sweep_max_items: 2,
3187 ..Default::default()
3188 });
3189
3190 for i in 1..=5 {
3192 store
3193 .put("ns", &format!("key{i}"), json!({"v": i}), None)
3194 .await
3195 .expect("put failed");
3196 }
3197
3198 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3200
3201 let count1 = store.sweep_expired_items().await.expect("sweep failed");
3203 assert_eq!(
3204 count1, 2,
3205 "first sweep should respect sweep_max_items limit"
3206 );
3207
3208 let count2 = store.sweep_expired_items().await.expect("sweep failed");
3210 assert_eq!(count2, 2, "second sweep should remove 2 more items");
3211
3212 let count3 = store.sweep_expired_items().await.expect("sweep failed");
3214 assert_eq!(count3, 1, "third sweep should remove last item");
3215
3216 let count4 = store.sweep_expired_items().await.expect("sweep_failed");
3218 assert_eq!(count4, 0, "fourth sweep should find no expired items");
3219 }
3220
3221 #[tokio::test]
3222 async fn test_sweep_expired_items_across_multiple_namespaces() {
3223 let store = MemoryStore::new().with_ttl_config(TTLConfig {
3224 default_ttl: Some(std::time::Duration::from_millis(50)),
3225 refresh_on_read: false,
3226 ..Default::default()
3227 });
3228
3229 store
3231 .put("ns1", "key1", json!({"v": 1}), None)
3232 .await
3233 .expect("put failed");
3234 store
3235 .put("ns1", "key2", json!({"v": 2}), None)
3236 .await
3237 .expect("put failed");
3238 store
3239 .put("ns2", "key1", json!({"v": 3}), None)
3240 .await
3241 .expect("put failed");
3242 store
3243 .put("ns2", "key2", json!({"v": 4}), None)
3244 .await
3245 .expect("put failed");
3246
3247 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3249
3250 let count = store.sweep_expired_items().await.expect("sweep failed");
3252 assert_eq!(count, 4, "sweep should remove all 4 expired items");
3253
3254 let total_items = {
3256 let data = store.data.read().await;
3257 data.values()
3258 .map(std::collections::HashMap::len)
3259 .sum::<usize>()
3260 };
3261 assert_eq!(total_items, 0, "all items should be removed");
3262 }
3263
3264 #[tokio::test]
3265 async fn test_sweep_expired_items_does_not_remove_non_expired() {
3266 let store = MemoryStore::new().with_ttl_config(TTLConfig {
3267 default_ttl: Some(std::time::Duration::from_secs(10)),
3268 refresh_on_read: false,
3269 ..Default::default()
3270 });
3271
3272 store
3273 .put("ns", "key1", json!({"v": 1}), None)
3274 .await
3275 .expect("put failed");
3276
3277 let count = store.sweep_expired_items().await.expect("sweep failed");
3279 assert_eq!(count, 0, "sweep should not remove non-expired items");
3280
3281 let item = store
3283 .get("ns", "key1")
3284 .await
3285 .expect("get failed")
3286 .expect("item should still exist");
3287 assert_eq!(item.key, "key1");
3288 }
3289
3290 #[tokio::test]
3291 async fn test_sweep_expired_items_with_no_ttl_items() {
3292 let store = MemoryStore::new();
3293
3294 store
3295 .put("ns", "key1", json!({"v": 1}), None)
3296 .await
3297 .expect("put failed");
3298
3299 let count = store.sweep_expired_items().await.expect("sweep failed");
3301 assert_eq!(count, 0, "sweep should not remove items without expiration");
3302
3303 let item = store
3305 .get("ns", "key1")
3306 .await
3307 .expect("get failed")
3308 .expect("item should still exist");
3309 assert_eq!(item.key, "key1");
3310 }
3311
3312 #[tokio::test]
3313 async fn test_start_sweep_task_runs_periodically() {
3314 let store = Arc::new(MemoryStore::new().with_ttl_config(TTLConfig {
3315 default_ttl: Some(std::time::Duration::from_millis(50)),
3316 refresh_on_read: false,
3317 sweep_interval: std::time::Duration::from_millis(100),
3318 ..Default::default()
3319 }));
3320
3321 store
3322 .put("ns", "key1", json!({"v": 1}), None)
3323 .await
3324 .expect("put failed");
3325
3326 let store_clone = Arc::clone(&store);
3328 let handle = store_clone.start_sweep_task();
3329
3330 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
3332
3333 let exists = {
3335 let data = store.data.read().await;
3336 data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3337 };
3338 assert!(!exists, "sweep task should have removed expired item");
3339
3340 handle.abort();
3342 }
3343
3344 #[tokio::test]
3345 async fn test_sweep_and_lazy_cleanup_work_together() {
3346 let store = MemoryStore::new().with_ttl_config(TTLConfig {
3347 default_ttl: Some(std::time::Duration::from_millis(50)),
3348 refresh_on_read: false,
3349 ..Default::default()
3350 });
3351
3352 for i in 1..=5 {
3354 store
3355 .put("ns", &format!("key{i}"), json!({"v": i}), None)
3356 .await
3357 .expect("put failed");
3358 }
3359
3360 tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3362
3363 let _ = store.get("ns", "key1").await;
3365
3366 let exists1 = {
3368 let data = store.data.read().await;
3369 data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3370 };
3371 assert!(!exists1, "lazy cleanup should remove key1");
3372
3373 let count = store.sweep_expired_items().await.expect("sweep failed");
3375 assert_eq!(count, 4, "sweep should remove remaining 4 items");
3376
3377 let total_items = {
3379 let data = store.data.read().await;
3380 data.values()
3381 .map(std::collections::HashMap::len)
3382 .sum::<usize>()
3383 };
3384 assert_eq!(total_items, 0, "all items should be removed");
3385 }
3386}