Skip to main content

juncture_core/
store.rs

1// Store trait and implementations for cross-thread long-term memory
2//
3// This module provides the Store abstraction for persistent key-value storage
4// that is independent of checkpoint and shared across all threads.
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[cfg(any(feature = "sqlite", feature = "postgres"))]
13use std::fmt::Write;
14
15#[cfg(any(feature = "sqlite", feature = "postgres"))]
16use sqlx::Row;
17
18/// Store error types
19#[derive(Debug, thiserror::Error)]
20pub enum StoreError {
21    /// Item not found
22    #[error("item not found: {namespace}/{key}")]
23    NotFound {
24        /// Namespace of the missing item
25        namespace: String,
26        /// Key of the missing item
27        key: String,
28    },
29
30    /// Invalid namespace format
31    #[error("invalid namespace: {0}")]
32    InvalidNamespace(String),
33
34    /// Serialization error
35    #[error("serialization error: {0}")]
36    Serialize(#[from] serde_json::Error),
37
38    /// Storage backend error
39    #[error("storage error: {0}")]
40    Storage(String),
41
42    /// Vector search error
43    #[error("vector search error: {0}")]
44    VectorSearch(String),
45
46    /// Embedding computation error
47    #[error("embedding error: {0}")]
48    Embedding(String),
49}
50
51/// Store trait for cross-thread long-term memory
52///
53/// Provides hierarchical namespace key-value storage independent of
54/// checkpoint and shared across all threads and graph executions.
55///
56/// Note: This trait cannot implement Debug as it's an async trait intended
57/// for dynamic dispatch via trait objects.
58#[async_trait]
59pub trait Store: Send + Sync + 'static {
60    /// Get item from store
61    ///
62    /// # Arguments
63    ///
64    /// * `namespace` - Namespace path
65    /// * `key` - Item key within namespace
66    async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError>;
67
68    /// Put item into store
69    ///
70    /// # Arguments
71    ///
72    /// * `namespace` - Namespace path
73    /// * `key` - Item key within namespace
74    /// * `value` - Item value
75    /// * `index` - Optional index fields for vector search
76    async fn put(
77        &self,
78        namespace: &str,
79        key: &str,
80        value: serde_json::Value,
81        index: Option<Vec<String>>,
82    ) -> Result<(), StoreError>;
83
84    /// Delete item from store
85    ///
86    /// # Arguments
87    ///
88    /// * `namespace` - Namespace path
89    /// * `key` - Item key within namespace
90    async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError>;
91
92    /// Search items with filtering and optional vector search
93    ///
94    /// # Arguments
95    ///
96    /// * `query` - Search query
97    async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError>;
98
99    /// List namespaces
100    ///
101    /// # Arguments
102    ///
103    /// * `prefix` - Optional prefix filter
104    /// * `suffix` - Optional suffix filter
105    /// * `max_depth` - Optional maximum depth
106    /// * `limit` - Optional result limit
107    /// * `offset` - Optional offset
108    async fn list_namespaces(
109        &self,
110        prefix: Option<&str>,
111        suffix: Option<&str>,
112        max_depth: Option<usize>,
113        limit: Option<usize>,
114        offset: Option<usize>,
115    ) -> Result<Vec<String>, StoreError>;
116
117    /// Execute batch operations
118    ///
119    /// # Arguments
120    ///
121    /// * `ops` - List of operations to execute
122    async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError>;
123}
124
125/// Stored item
126#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct Item {
128    /// Namespace path
129    pub namespace: String,
130    /// Key within namespace
131    pub key: String,
132    /// Stored value
133    pub value: serde_json::Value,
134    /// Creation timestamp
135    pub created_at: DateTime<Utc>,
136    /// Last update timestamp
137    pub updated_at: DateTime<Utc>,
138    /// Optional expiration timestamp for TTL support
139    pub expires_at: Option<DateTime<Utc>>,
140    /// Optional embedding vector for vector search.
141    ///
142    /// Pre-computed during `put()` when an [`IndexConfig`] with an
143    /// [`EmbeddingFunc`] is configured and index fields are provided.
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub embedding: Option<Vec<f32>>,
146}
147
148impl Item {
149    /// Returns `true` if the item has expired based on `expires_at`.
150    ///
151    /// Items without an expiration timestamp are never considered expired.
152    #[must_use]
153    pub fn is_expired(&self) -> bool {
154        self.expires_at
155            .is_some_and(|expires_at| Utc::now() > expires_at)
156    }
157}
158
159/// Search result item with optional similarity score
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct SearchItem {
162    /// Base item
163    #[serde(flatten)]
164    pub item: Item,
165    /// Similarity score (for vector search)
166    pub score: Option<f64>,
167}
168
169/// Search query
170#[derive(Debug, Clone, Default)]
171pub struct SearchQuery {
172    /// Namespace prefix
173    pub namespace_prefix: String,
174    /// Filter expression
175    pub filter: Option<FilterExpr>,
176    /// Natural language query (for vector search)
177    pub query: Option<String>,
178    /// Result limit
179    pub limit: usize,
180    /// Offset for pagination
181    pub offset: usize,
182}
183
184/// Search result
185#[derive(Debug, Clone)]
186pub struct SearchResult {
187    /// Matching items
188    pub items: Vec<SearchItem>,
189    /// Total count
190    pub total_count: usize,
191}
192
193/// Filter expression for search
194#[derive(Debug, Clone, Serialize, Deserialize)]
195#[serde(tag = "op")]
196pub enum FilterExpr {
197    /// Equality
198    #[serde(rename = "$eq")]
199    Eq {
200        /// Field path
201        field: String,
202        /// Value to compare
203        value: serde_json::Value,
204    },
205    /// Inequality
206    #[serde(rename = "$ne")]
207    Ne {
208        /// Field path
209        field: String,
210        /// Value to compare
211        value: serde_json::Value,
212    },
213    /// Greater than
214    #[serde(rename = "$gt")]
215    Gt {
216        /// Field path
217        field: String,
218        /// Value to compare
219        value: serde_json::Value,
220    },
221    /// Greater than or equal
222    #[serde(rename = "$gte")]
223    Gte {
224        /// Field path
225        field: String,
226        /// Value to compare
227        value: serde_json::Value,
228    },
229    /// Less than
230    #[serde(rename = "$lt")]
231    Lt {
232        /// Field path
233        field: String,
234        /// Value to compare
235        value: serde_json::Value,
236    },
237    /// Less than or equal
238    #[serde(rename = "$lte")]
239    Lte {
240        /// Field path
241        field: String,
242        /// Value to compare
243        value: serde_json::Value,
244    },
245    /// Logical AND
246    #[serde(rename = "$and")]
247    And {
248        /// Sub-expressions
249        expressions: Vec<FilterExpr>,
250    },
251    /// Logical OR
252    #[serde(rename = "$or")]
253    Or {
254        /// Sub-expressions
255        expressions: Vec<FilterExpr>,
256    },
257    /// Logical NOT
258    #[serde(rename = "$not")]
259    Not {
260        /// Negated expression
261        expr: Box<FilterExpr>,
262    },
263}
264
265impl FilterExpr {
266    /// Evaluate this filter expression against a JSON value.
267    #[must_use]
268    ///
269    /// Returns `true` if the value matches the filter criteria.
270    /// Supports dot-notation field paths (e.g., `"address.city"`).
271    pub fn matches(&self, value: &serde_json::Value) -> bool {
272        evaluate_filter(self, value)
273    }
274}
275
276/// Store operation type
277#[derive(Debug, Clone)]
278pub enum StoreOp {
279    /// Get operation
280    Get {
281        /// Namespace
282        namespace: String,
283        /// Key
284        key: String,
285    },
286    /// Put operation
287    Put {
288        /// Namespace
289        namespace: String,
290        /// Key
291        key: String,
292        /// Value
293        value: serde_json::Value,
294        /// Index fields
295        index: Option<Vec<String>>,
296    },
297    /// Delete operation
298    Delete {
299        /// Namespace
300        namespace: String,
301        /// Key
302        key: String,
303    },
304    /// Search operation
305    Search(SearchQuery),
306    /// List namespaces operation
307    ListNamespaces {
308        /// Prefix filter
309        prefix: Option<String>,
310        /// Suffix filter
311        suffix: Option<String>,
312        /// Maximum depth
313        max_depth: Option<usize>,
314        /// Result limit
315        limit: Option<usize>,
316    },
317}
318
319/// Store operation result
320#[derive(Debug, Clone)]
321pub enum StoreResult {
322    /// Single item result
323    Item(Option<Item>),
324    /// Multiple items result
325    Items(SearchResult),
326    /// Namespaces result
327    Namespaces(Vec<String>),
328    /// Empty result
329    None,
330}
331
332/// Configuration for time-to-live (TTL) behavior on [`MemoryStore`].
333///
334/// Controls automatic expiration of items using both lazy evaluation on read
335/// and periodic background sweep tasks. Expired items are detected and removed
336/// during `get()` and `search()` operations, and can be cleaned up in bulk
337/// via the background sweep mechanism.
338///
339/// # Examples
340///
341/// ```
342/// use juncture_core::store::{MemoryStore, TTLConfig};
343/// use std::time::Duration;
344///
345/// let store = MemoryStore::new().with_ttl_config(TTLConfig {
346///     default_ttl: Some(Duration::from_secs(300)),
347///     refresh_on_read: true,
348///     ..Default::default()
349/// });
350/// ```
351#[derive(Clone, Debug)]
352pub struct TTLConfig {
353    /// Default TTL duration applied to items inserted via `put()`.
354    ///
355    /// When `None`, items never expire (the default).
356    /// When `Some(duration)`, each `put()` sets `expires_at = now + duration`.
357    pub default_ttl: Option<std::time::Duration>,
358    /// Whether to extend an item's expiration time when it is read via `get()`.
359    ///
360    /// When `true` and `default_ttl` is set, a successful `get()` will update
361    /// the item's `expires_at` to `now + default_ttl`, effectively resetting
362    /// its TTL timer.
363    pub refresh_on_read: bool,
364    /// Interval between background sweep cleanup cycles.
365    ///
366    /// The background sweep task runs periodically to remove expired items
367    /// in bulk, preventing unbounded growth of the store. Each sweep pass
368    /// processes at most `sweep_max_items` expired items to avoid blocking
369    /// other operations for extended periods.
370    pub sweep_interval: std::time::Duration,
371    /// Maximum number of items to sweep per background cleanup cycle.
372    ///
373    /// Limits the work done by a single sweep pass. Each sweep cycle will
374    /// remove at most this many expired items, ensuring predictable cleanup
375    /// times even with large numbers of expired entries.
376    pub sweep_max_items: usize,
377}
378
379impl Default for TTLConfig {
380    fn default() -> Self {
381        Self {
382            default_ttl: None,
383            refresh_on_read: false,
384            sweep_interval: std::time::Duration::from_secs(300),
385            sweep_max_items: 1000,
386        }
387    }
388}
389
390/// In-memory store implementation
391///
392/// Thread-safe in-memory store using `RwLock` for concurrent access.
393#[derive(Debug)]
394pub struct MemoryStore {
395    /// Data: namespace -> (key -> Item)
396    data: Arc<tokio::sync::RwLock<HashMap<String, HashMap<String, Item>>>>,
397    /// Vector index configuration
398    index_config: Option<IndexConfig>,
399    /// TTL configuration for item expiration.
400    ttl_config: TTLConfig,
401}
402
403/// Trait for computing embeddings for vector search.
404///
405/// Implementations provide async embedding generation from text inputs,
406/// used by vector-capable stores for similarity search.
407///
408/// # Errors
409///
410/// Implementations should return [`StoreError::Embedding`] or a suitable variant
411/// if embedding generation fails (network error, model error, etc.).
412#[async_trait::async_trait]
413pub trait EmbeddingFunc: Send + Sync + 'static {
414    /// Generate embedding vectors for the given texts.
415    ///
416    /// # Arguments
417    ///
418    /// * `texts` - Text strings to embed
419    ///
420    /// # Errors
421    ///
422    /// Returns [`StoreError`] if embedding generation fails.
423    async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError>;
424}
425
426/// Vector index configuration
427///
428/// Configure vector similarity search on a store by providing an
429/// [`EmbeddingFunc`] implementation and the embedding dimension count.
430pub struct IndexConfig {
431    /// Embedding dimensions
432    pub dims: usize,
433    /// Embedding function for computing vectors from text
434    pub embed: Box<dyn EmbeddingFunc>,
435    /// Fields to index (None indexes all text fields)
436    pub fields: Option<Vec<String>>,
437}
438
439impl std::fmt::Debug for IndexConfig {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        f.debug_struct("IndexConfig")
442            .field("dims", &self.dims)
443            .field("embed", &"...")
444            .field("fields", &self.fields)
445            .finish()
446    }
447}
448
449impl Default for MemoryStore {
450    fn default() -> Self {
451        Self::new()
452    }
453}
454
455impl MemoryStore {
456    /// Create new in-memory store
457    #[must_use]
458    pub fn new() -> Self {
459        Self {
460            data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
461            index_config: None,
462            ttl_config: TTLConfig::default(),
463        }
464    }
465
466    /// Create store with vector search enabled
467    ///
468    /// # Arguments
469    ///
470    /// * `config` - Index configuration
471    #[must_use]
472    pub fn with_vector_search(mut self, config: IndexConfig) -> Self {
473        self.index_config = Some(config);
474        self
475    }
476
477    /// Configure TTL behavior for item expiration.
478    ///
479    /// # Arguments
480    ///
481    /// * `config` - TTL configuration
482    #[must_use]
483    pub const fn with_ttl_config(mut self, config: TTLConfig) -> Self {
484        self.ttl_config = config;
485        self
486    }
487
488    /// Sweep expired items from the store, returning the count of removed items.
489    ///
490    /// This method iterates through all namespaces and their items, collecting
491    /// keys where `expires_at < now`. It removes at most `sweep_max_items`
492    /// expired items to avoid blocking other operations for extended periods.
493    ///
494    /// The sweep is cooperative with the lazy cleanup in `get()` and `search()`:
495    /// expired items are removed during normal reads, while this method performs
496    /// bulk cleanup in the background.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`StoreError::Storage`] if the sweep operation encounters an
501    /// unexpected error (currently unused, reserved for future extensions).
502    ///
503    /// # Examples
504    ///
505    /// ```
506    /// use juncture_core::store::{MemoryStore, Store, TTLConfig};
507    /// use std::time::Duration;
508    /// use serde_json::json;
509    ///
510    /// # #[tokio::main]
511    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
512    /// let store = MemoryStore::new().with_ttl_config(TTLConfig {
513    ///     default_ttl: Some(Duration::from_millis(50)),
514    ///     refresh_on_read: false,
515    ///     ..Default::default()
516    /// });
517    ///
518    /// store.put("ns", "key1", json!({ "v": 1 }), None).await?;
519    /// store.put("ns", "key2", json!({ "v": 2 }), None).await?;
520    ///
521    /// // Wait for items to expire
522    /// tokio::time::sleep(Duration::from_millis(80)).await;
523    ///
524    /// // Sweep removes both expired items
525    /// let count = store.sweep_expired_items().await?;
526    /// assert_eq!(count, 2);
527    /// # Ok(())
528    /// # }
529    /// ```
530    #[allow(
531        clippy::significant_drop_tightening,
532        reason = "Write lock must be held during iteration and removal"
533    )]
534    pub async fn sweep_expired_items(&self) -> Result<usize, StoreError> {
535        let now = Utc::now();
536        let mut count = 0;
537        let mut items = self.data.write().await;
538
539        // Collect expired keys across all namespaces
540        // Structure: (namespace, key) pairs to remove
541        let mut keys_to_remove = Vec::new();
542
543        for (namespace, namespace_map) in items.iter() {
544            for (key, item) in namespace_map {
545                if let Some(expires_at) = item.expires_at
546                    && expires_at < now
547                {
548                    keys_to_remove.push((namespace.clone(), key.clone()));
549                    count += 1;
550                    if count >= self.ttl_config.sweep_max_items {
551                        break;
552                    }
553                }
554            }
555            if count >= self.ttl_config.sweep_max_items {
556                break;
557            }
558        }
559
560        // Remove collected expired items
561        for (namespace, key) in keys_to_remove {
562            if let Some(namespace_map) = items.get_mut(&namespace) {
563                namespace_map.remove(&key);
564            }
565        }
566
567        Ok(count)
568    }
569
570    /// Start the background sweep task for periodic expired item cleanup.
571    ///
572    /// This method spawns a tokio task that runs periodically according to
573    /// `sweep_interval` in the TTL config. Each sweep cycle removes at most
574    /// `sweep_max_items` expired items. Errors are logged via `tracing::warn!`
575    /// and do not terminate the task.
576    ///
577    /// # Note
578    ///
579    /// The returned `JoinHandle` can be used to abort the task via
580    /// `handle.abort()` or awaited to ensure the task completes. If dropped
581    /// without aborting, the task will continue running in the background.
582    ///
583    /// # Examples
584    ///
585    /// ```
586    /// use juncture_core::store::{MemoryStore, TTLConfig};
587    /// use std::{time::Duration, sync::Arc};
588    ///
589    /// # #[tokio::main]
590    /// # async fn main() {
591    /// let store = Arc::new(MemoryStore::new().with_ttl_config(TTLConfig {
592    ///     default_ttl: Some(Duration::from_secs(300)),
593    ///     refresh_on_read: true,
594    ///     ..Default::default()
595    /// }));
596    ///
597    /// // Start background sweep task
598    /// let _sweep_handle = store.start_sweep_task();
599    ///
600    /// // Store continues to work, sweep task runs in background
601    /// // The task will run until _sweep_handle is dropped or aborted
602    /// # }
603    /// ```
604    #[must_use]
605    pub fn start_sweep_task(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
606        tokio::spawn(async move {
607            let mut interval = tokio::time::interval(self.ttl_config.sweep_interval);
608            loop {
609                interval.tick().await;
610                if let Err(e) = self.sweep_expired_items().await {
611                    tracing::warn!("Store sweep failed: {}", e);
612                }
613            }
614        })
615    }
616}
617
618#[async_trait]
619impl Store for MemoryStore {
620    #[allow(
621        clippy::significant_drop_tightening,
622        reason = "Read lock is scoped tightly; write lock acquired after release"
623    )]
624    async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
625        // Phase 1: read lock -- check if item exists and whether it is expired
626        let is_expired = {
627            let data = self.data.read().await;
628            let Some(ns) = data.get(namespace) else {
629                return Ok(None);
630            };
631            let Some(item) = ns.get(key) else {
632                return Ok(None);
633            };
634            item.is_expired()
635        };
636
637        if is_expired {
638            // Phase 2a: write lock -- lazily remove expired item
639            let mut data = self.data.write().await;
640            if let Some(ns_map) = data.get_mut(namespace) {
641                ns_map.remove(key);
642            }
643            drop(data);
644            return Ok(None);
645        }
646
647        if self.ttl_config.refresh_on_read && self.ttl_config.default_ttl.is_some() {
648            // Phase 2b: write lock -- refresh TTL and return item
649            let ttl = self.ttl_config.default_ttl.expect("checked is_some above");
650            let now = Utc::now();
651            let new_expires =
652                now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX);
653
654            let mut data = self.data.write().await;
655            if let Some(ns_map) = data.get_mut(namespace)
656                && let Some(item) = ns_map.get_mut(key)
657            {
658                item.expires_at = Some(new_expires);
659                item.updated_at = now;
660                let cloned = item.clone();
661                drop(data);
662                return Ok(Some(cloned));
663            }
664            drop(data);
665            // Item was removed between read and write phases
666            return Ok(None);
667        }
668
669        // Phase 2c: read lock -- return item without modification
670        let data = self.data.read().await;
671        let item = data.get(namespace).and_then(|ns| ns.get(key).cloned());
672        Ok(item)
673    }
674
675    #[allow(
676        clippy::significant_drop_tightening,
677        reason = "Lock must be held for entire put operation after embedding"
678    )]
679    async fn put(
680        &self,
681        namespace: &str,
682        key: &str,
683        value: serde_json::Value,
684        index: Option<Vec<String>>,
685    ) -> Result<(), StoreError> {
686        // Compute embedding outside the write lock (embeddings may be async)
687        let embedding = if let Some(ref index_config) = self.index_config {
688            if let Some(index_fields) = &index {
689                if index_fields.is_empty() {
690                    None
691                } else {
692                    let text = extract_index_text(&value, index_fields);
693                    if text.is_empty() {
694                        None
695                    } else {
696                        let mut embeddings = index_config.embed.embed(vec![text]).await?;
697                        embeddings.pop()
698                    }
699                }
700            } else {
701                None
702            }
703        } else {
704            None
705        };
706
707        let now = Utc::now();
708        let expires_at = self
709            .ttl_config
710            .default_ttl
711            .map(|ttl| now + chrono::Duration::from_std(ttl).unwrap_or(chrono::Duration::MAX));
712
713        let mut data = self.data.write().await;
714
715        let namespace_map = data
716            .entry(namespace.to_string())
717            .or_insert_with(HashMap::new);
718        let existing = namespace_map.get(key);
719
720        let item = Item {
721            namespace: namespace.to_string(),
722            key: key.to_string(),
723            value,
724            created_at: existing.map_or(now, |i| i.created_at),
725            updated_at: now,
726            expires_at,
727            embedding,
728        };
729
730        namespace_map.insert(key.to_string(), item);
731        Ok(())
732    }
733
734    #[allow(
735        clippy::significant_drop_tightening,
736        reason = "Lock must be held for entire delete operation"
737    )]
738    async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
739        let mut data = self.data.write().await;
740        if let Some(namespace_map) = data.get_mut(namespace) {
741            namespace_map.remove(key);
742        }
743        Ok(())
744    }
745
746    #[allow(
747        clippy::significant_drop_tightening,
748        reason = "Lock must be held for entire search iteration"
749    )]
750    async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
751        // Compute query embedding outside the read lock
752        let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
753            if let Some(query_text) = &query.query {
754                if query_text.is_empty() {
755                    None
756                } else {
757                    let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
758                    embeddings.pop()
759                }
760            } else {
761                None
762            }
763        } else {
764            None
765        };
766
767        // Phase 1: Gather items under read lock
768        let mut items: Vec<SearchItem> = {
769            let data = self.data.read().await;
770            let mut results = Vec::new();
771
772            for (namespace, namespace_map) in data.iter() {
773                if namespace.starts_with(&query.namespace_prefix) {
774                    for item in namespace_map.values() {
775                        if item.is_expired() {
776                            continue;
777                        }
778
779                        if query
780                            .filter
781                            .as_ref()
782                            .is_some_and(|filter| !evaluate_filter(filter, &item.value))
783                        {
784                            continue;
785                        }
786
787                        let score = query_embedding.as_ref().and_then(|q_emb| {
788                            item.embedding
789                                .as_ref()
790                                .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
791                        });
792
793                        results.push(SearchItem {
794                            item: item.clone(),
795                            score,
796                        });
797                    }
798                }
799            }
800            results
801        };
802
803        let total = items.len();
804
805        // Phase 2: Sort by similarity when vector search is active
806        if query_embedding.is_some() {
807            items.sort_by(|a, b| {
808                b.score
809                    .partial_cmp(&a.score)
810                    .unwrap_or(std::cmp::Ordering::Equal)
811            });
812        }
813
814        // Phase 3: Apply pagination
815        let start = query.offset.min(items.len());
816        let end = (start + query.limit).min(items.len());
817        let page = items.drain(start..end).collect();
818
819        Ok(SearchResult {
820            items: page,
821            total_count: total,
822        })
823    }
824
825    async fn list_namespaces(
826        &self,
827        prefix: Option<&str>,
828        suffix: Option<&str>,
829        max_depth: Option<usize>,
830        limit: Option<usize>,
831        offset: Option<usize>,
832    ) -> Result<Vec<String>, StoreError> {
833        let mut namespaces: Vec<String> = {
834            let data = self.data.read().await;
835            data.keys().cloned().collect()
836        };
837
838        // Apply filters
839        if let Some(prefix_filter) = prefix {
840            namespaces.retain(|ns| ns.starts_with(prefix_filter));
841        }
842        if let Some(suffix_filter) = suffix {
843            namespaces.retain(|ns| ns.ends_with(suffix_filter));
844        }
845
846        // Apply max_depth: truncate namespace paths to the first N segments
847        if let Some(depth) = max_depth {
848            namespaces = namespaces
849                .into_iter()
850                .map(|ns| {
851                    let parts: Vec<&str> = ns.split('/').take(depth).collect();
852                    parts.join("/")
853                })
854                .collect();
855            namespaces.sort();
856            namespaces.dedup();
857        }
858
859        // Apply offset-based pagination (skip first N results)
860        if let Some(offset_value) = offset {
861            let skip = offset_value.min(namespaces.len());
862            namespaces.drain(..skip);
863        }
864
865        // Apply limit
866        if let Some(limit_value) = limit {
867            namespaces.truncate(limit_value);
868        }
869
870        Ok(namespaces)
871    }
872
873    async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
874        let mut results = Vec::with_capacity(ops.len());
875
876        for op in ops {
877            let result = match op {
878                StoreOp::Get { namespace, key } => {
879                    let item = self.get(&namespace, &key).await?;
880                    StoreResult::Item(item)
881                }
882                StoreOp::Put {
883                    namespace,
884                    key,
885                    value,
886                    index,
887                } => {
888                    self.put(&namespace, &key, value, index).await?;
889                    StoreResult::None
890                }
891                StoreOp::Delete { namespace, key } => {
892                    self.delete(&namespace, &key).await?;
893                    StoreResult::None
894                }
895                StoreOp::Search(query) => {
896                    let result = self.search(query).await?;
897                    StoreResult::Items(result)
898                }
899                StoreOp::ListNamespaces {
900                    prefix,
901                    suffix,
902                    max_depth,
903                    limit,
904                } => {
905                    let namespaces = self
906                        .list_namespaces(
907                            prefix.as_deref(),
908                            suffix.as_deref(),
909                            max_depth,
910                            limit,
911                            None,
912                        )
913                        .await?;
914                    StoreResult::Namespaces(namespaces)
915                }
916            };
917            results.push(result);
918        }
919
920        Ok(results)
921    }
922}
923
924/// Evaluate filter expression against value
925fn evaluate_filter(filter: &FilterExpr, value: &serde_json::Value) -> bool {
926    match filter {
927        FilterExpr::Eq {
928            field,
929            value: expected,
930        } => get_field(value, field).is_some_and(|v| v == *expected),
931        FilterExpr::Ne {
932            field,
933            value: expected,
934        } => get_field(value, field).is_none_or(|v| v != *expected),
935        FilterExpr::Gt {
936            field,
937            value: expected,
938        } => compare_numbers(value, field, expected, |a, b| a > b),
939        FilterExpr::Gte {
940            field,
941            value: expected,
942        } => compare_numbers(value, field, expected, |a, b| a >= b),
943        FilterExpr::Lt {
944            field,
945            value: expected,
946        } => compare_numbers(value, field, expected, |a, b| a < b),
947        FilterExpr::Lte {
948            field,
949            value: expected,
950        } => compare_numbers(value, field, expected, |a, b| a <= b),
951        FilterExpr::And { expressions } => {
952            expressions.iter().all(|expr| evaluate_filter(expr, value))
953        }
954        FilterExpr::Or { expressions } => {
955            expressions.iter().any(|expr| evaluate_filter(expr, value))
956        }
957        FilterExpr::Not { expr } => !evaluate_filter(expr, value),
958    }
959}
960
961/// Get nested field from JSON value
962fn get_field(value: &serde_json::Value, path: &str) -> Option<serde_json::Value> {
963    let parts: Vec<&str> = path.split('.').collect();
964    let mut current = value;
965
966    for part in parts {
967        match current {
968            serde_json::Value::Object(map) => {
969                current = map.get(part)?;
970            }
971            _ => return None,
972        }
973    }
974
975    Some(current.clone())
976}
977
978/// Compare numeric fields
979fn compare_numbers(
980    value: &serde_json::Value,
981    field: &str,
982    expected: &serde_json::Value,
983    comparator: impl Fn(f64, f64) -> bool,
984) -> bool {
985    match (get_field(value, field), expected) {
986        (Some(serde_json::Value::Number(a)), serde_json::Value::Number(b)) => {
987            match (a.as_f64(), b.as_f64()) {
988                (Some(a_val), Some(b_val)) => comparator(a_val, b_val),
989                _ => false,
990            }
991        }
992        _ => false,
993    }
994}
995
996/// Compute cosine similarity between two vectors.
997///
998/// Returns a value in `[-1, 1]` where 1 means identical direction,
999/// 0 means orthogonal, and -1 means opposite direction.
1000/// Returns `0.0` if either vector has zero magnitude.
1001/// When vectors differ in length, only the common prefix is compared.
1002#[must_use]
1003pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
1004    let len = a.len().min(b.len());
1005    if len == 0 {
1006        return 0.0;
1007    }
1008    let a = &a[..len];
1009    let b = &b[..len];
1010    let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1011    let norm_a = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1012    let norm_b = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1013    if norm_a == 0.0 || norm_b == 0.0 {
1014        return 0.0;
1015    }
1016    dot_product / (norm_a * norm_b)
1017}
1018
1019/// Extract indexable text from a JSON value for the given field paths.
1020///
1021/// Concatenates string representations of each field's value, separated by
1022/// spaces, for use as input to an embedding function.
1023fn extract_index_text(value: &serde_json::Value, fields: &[String]) -> String {
1024    fields
1025        .iter()
1026        .filter_map(|field| {
1027            get_field(value, field).map(|v| {
1028                v.as_str()
1029                    .map_or_else(|| v.to_string(), ToString::to_string)
1030            })
1031        })
1032        .collect::<Vec<_>>()
1033        .join(" ")
1034}
1035
1036/// SQLite-based store implementation
1037///
1038/// Provides persistent storage using `SQLite` database with optional
1039/// vector similarity search support.
1040/// Requires the `sqlite` feature flag.
1041#[cfg(feature = "sqlite")]
1042#[derive(Debug)]
1043pub struct SqliteStore {
1044    /// Database connection pool
1045    pool: Option<sqlx::SqlitePool>,
1046    /// Vector index configuration
1047    index_config: Option<IndexConfig>,
1048}
1049
1050#[cfg(feature = "sqlite")]
1051impl SqliteStore {
1052    /// Create new `SQLite` store
1053    ///
1054    /// # Arguments
1055    ///
1056    /// * `database_url` - `SQLite` database connection string
1057    ///
1058    /// # Errors
1059    ///
1060    /// Returns `StoreError` if connection fails or table creation fails.
1061    pub async fn new(database_url: &str) -> Result<Self, StoreError> {
1062        let pool = sqlx::SqlitePool::connect(database_url)
1063            .await
1064            .map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
1065
1066        // Run migrations
1067        sqlx::query(
1068            r"
1069            CREATE TABLE IF NOT EXISTS store_items (
1070                namespace TEXT NOT NULL,
1071                key TEXT NOT NULL,
1072                value TEXT NOT NULL,
1073                created_at TEXT NOT NULL,
1074                updated_at TEXT NOT NULL,
1075                PRIMARY KEY (namespace, key)
1076            )
1077            ",
1078        )
1079        .execute(&pool)
1080        .await
1081        .map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
1082
1083        // Create store_vectors table for vector search support
1084        // Stores pre-computed embeddings as BLOB for cosine similarity search
1085        sqlx::query(
1086            r"
1087            CREATE TABLE IF NOT EXISTS store_vectors (
1088                namespace TEXT NOT NULL,
1089                key TEXT NOT NULL,
1090                field TEXT NOT NULL,
1091                vector BLOB NOT NULL,
1092                PRIMARY KEY (namespace, key, field),
1093                FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
1094            )
1095            ",
1096        )
1097        .execute(&pool)
1098        .await
1099        .map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
1100
1101        Ok(Self {
1102            pool: Some(pool),
1103            index_config: None,
1104        })
1105    }
1106
1107    /// Create new `SqliteStore` with vector search enabled.
1108    ///
1109    /// # Arguments
1110    ///
1111    /// * `database_url` - `SQLite` database connection string
1112    /// * `config` - [`IndexConfig`] for vector search
1113    ///
1114    /// # Errors
1115    ///
1116    /// Returns [`StoreError`] if connection fails or table creation fails.
1117    pub async fn with_vector_search(
1118        database_url: &str,
1119        config: IndexConfig,
1120    ) -> Result<Self, StoreError> {
1121        let mut store = Self::new(database_url).await?;
1122        store.index_config = Some(config);
1123        Ok(store)
1124    }
1125}
1126
1127/// Convert a [`FilterExpr`] to a `SQLite` WHERE clause fragment with bind parameters.
1128///
1129/// Returns a tuple of (SQL clause string, bind parameter values).
1130/// Each `?` placeholder in the returned SQL corresponds to an entry in the returned
1131/// bind parameter vector in order.
1132#[cfg(feature = "sqlite")]
1133fn filter_to_sql_sqlite(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
1134    match filter {
1135        FilterExpr::Eq { field, value } => (
1136            format!("json_extract(value, '$.{field}') = ?"),
1137            vec![value.clone()],
1138        ),
1139        FilterExpr::Ne { field, value } => (
1140            format!("json_extract(value, '$.{field}') != ?"),
1141            vec![value.clone()],
1142        ),
1143        FilterExpr::Gt { field, value } => (
1144            format!("CAST(json_extract(value, '$.{field}') AS REAL) > CAST(? AS REAL)"),
1145            vec![value.clone()],
1146        ),
1147        FilterExpr::Gte { field, value } => (
1148            format!("CAST(json_extract(value, '$.{field}') AS REAL) >= CAST(? AS REAL)"),
1149            vec![value.clone()],
1150        ),
1151        FilterExpr::Lt { field, value } => (
1152            format!("CAST(json_extract(value, '$.{field}') AS REAL) < CAST(? AS REAL)"),
1153            vec![value.clone()],
1154        ),
1155        FilterExpr::Lte { field, value } => (
1156            format!("CAST(json_extract(value, '$.{field}') AS REAL) <= CAST(? AS REAL)"),
1157            vec![value.clone()],
1158        ),
1159        FilterExpr::And { expressions } => {
1160            let mut clauses = Vec::with_capacity(expressions.len());
1161            let mut all_params = Vec::new();
1162            for expr in expressions {
1163                let (clause, params) = filter_to_sql_sqlite(expr);
1164                clauses.push(format!("({clause})"));
1165                all_params.extend(params);
1166            }
1167            (clauses.join(" AND "), all_params)
1168        }
1169        FilterExpr::Or { expressions } => {
1170            let mut clauses = Vec::with_capacity(expressions.len());
1171            let mut all_params = Vec::new();
1172            for expr in expressions {
1173                let (clause, params) = filter_to_sql_sqlite(expr);
1174                clauses.push(format!("({clause})"));
1175                all_params.extend(params);
1176            }
1177            (clauses.join(" OR "), all_params)
1178        }
1179        FilterExpr::Not { expr } => {
1180            let (clause, params) = filter_to_sql_sqlite(expr);
1181            (format!("NOT ({clause})"), params)
1182        }
1183    }
1184}
1185
1186/// Serialize a [`serde_json::Value`] to a SQLite-compatible bind string.
1187///
1188/// `SQLite` `json_extract` converts JSON booleans to integers (0 or 1),
1189/// so booleans must be rendered as `"1"` / `"0"` to compare correctly.
1190#[cfg(feature = "sqlite")]
1191fn sqlite_param_from_value(value: &serde_json::Value) -> String {
1192    match value {
1193        serde_json::Value::Bool(true) => "1".to_string(),
1194        serde_json::Value::Bool(false) => "0".to_string(),
1195        serde_json::Value::String(s) => s.clone(),
1196        other => other.to_string(),
1197    }
1198}
1199
1200#[cfg(feature = "sqlite")]
1201#[async_trait]
1202impl Store for SqliteStore {
1203    async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
1204        let pool = self
1205            .pool
1206            .as_ref()
1207            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1208
1209        let result = sqlx::query(
1210            "SELECT value, created_at, updated_at FROM store_items WHERE namespace = ? AND key = ?",
1211        )
1212        .bind(namespace)
1213        .bind(key)
1214        .fetch_optional(pool)
1215        .await
1216        .map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
1217
1218        if let Some(row) = result {
1219            let value_str: String = row
1220                .try_get("value")
1221                .map_err(|e| StoreError::Storage(e.to_string()))?;
1222            let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
1223            let created_at: String = row
1224                .try_get("created_at")
1225                .map_err(|e| StoreError::Storage(e.to_string()))?;
1226            let updated_at: String = row
1227                .try_get("updated_at")
1228                .map_err(|e| StoreError::Storage(e.to_string()))?;
1229
1230            // Load embedding if exists
1231            let embedding = if self.index_config.is_some() {
1232                let vector_row = sqlx::query(
1233                    "SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
1234                )
1235                .bind(namespace)
1236                .bind(key)
1237                .fetch_optional(pool)
1238                .await
1239                .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1240
1241                if let Some(vrow) = vector_row {
1242                    let bytes: Vec<u8> = vrow
1243                        .try_get("vector")
1244                        .map_err(|e| StoreError::Storage(e.to_string()))?;
1245                    Some(blob_to_vector(&bytes)?)
1246                } else {
1247                    None
1248                }
1249            } else {
1250                None
1251            };
1252
1253            Ok(Some(Item {
1254                namespace: namespace.to_string(),
1255                key: key.to_string(),
1256                value,
1257                created_at: chrono::DateTime::parse_from_rfc3339(&created_at)
1258                    .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1259                    .with_timezone(&chrono::Utc),
1260                updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at)
1261                    .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1262                    .with_timezone(&chrono::Utc),
1263                expires_at: None,
1264                embedding,
1265            }))
1266        } else {
1267            Ok(None)
1268        }
1269    }
1270
1271    async fn put(
1272        &self,
1273        namespace: &str,
1274        key: &str,
1275        value: serde_json::Value,
1276        index: Option<Vec<String>>,
1277    ) -> Result<(), StoreError> {
1278        let pool = self
1279            .pool
1280            .as_ref()
1281            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1282
1283        let now = Utc::now();
1284
1285        // Compute embedding before the database transaction
1286        let embedding = if let Some(ref index_config) = self.index_config {
1287            if let Some(index_fields) = &index {
1288                if index_fields.is_empty() {
1289                    None
1290                } else {
1291                    let text = extract_index_text(&value, index_fields);
1292                    if text.is_empty() {
1293                        None
1294                    } else {
1295                        let mut embeddings = index_config.embed.embed(vec![text]).await?;
1296                        embeddings.pop()
1297                    }
1298                }
1299            } else {
1300                None
1301            }
1302        } else {
1303            None
1304        };
1305
1306        let value_str = serde_json::to_string(&value).map_err(StoreError::Serialize)?;
1307        let now_str = now.to_rfc3339();
1308
1309        sqlx::query(
1310            r"
1311            INSERT INTO store_items (namespace, key, value, created_at, updated_at)
1312            VALUES (?, ?, ?, ?, ?)
1313            ON CONFLICT (namespace, key) DO UPDATE SET
1314                value = excluded.value,
1315                updated_at = excluded.updated_at
1316            ",
1317        )
1318        .bind(namespace)
1319        .bind(key)
1320        .bind(&value_str)
1321        .bind(&now_str)
1322        .bind(&now_str)
1323        .execute(pool)
1324        .await
1325        .map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
1326
1327        // Store embedding if configured
1328        if let Some(vec) = embedding {
1329            let bytes = vector_to_blob(&vec);
1330            sqlx::query(
1331                r"
1332                INSERT INTO store_vectors (namespace, key, field, vector)
1333                VALUES (?, ?, 'default', ?)
1334                ON CONFLICT (namespace, key, field) DO UPDATE SET
1335                    vector = excluded.vector
1336                ",
1337            )
1338            .bind(namespace)
1339            .bind(key)
1340            .bind(&bytes)
1341            .execute(pool)
1342            .await
1343            .map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
1344        }
1345
1346        Ok(())
1347    }
1348
1349    async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
1350        let pool = self
1351            .pool
1352            .as_ref()
1353            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1354
1355        sqlx::query("DELETE FROM store_items WHERE namespace = ? AND key = ?")
1356            .bind(namespace)
1357            .bind(key)
1358            .execute(pool)
1359            .await
1360            .map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
1361
1362        Ok(())
1363    }
1364
1365    #[allow(
1366        clippy::cast_possible_truncation,
1367        clippy::cast_possible_wrap,
1368        clippy::cast_sign_loss,
1369        clippy::as_conversions,
1370        clippy::similar_names,
1371        clippy::too_many_lines,
1372        reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
1373    )]
1374    async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
1375        let pool = self
1376            .pool
1377            .as_ref()
1378            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1379
1380        // Compute query embedding if vector search is enabled and query text is provided
1381        let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
1382            if let Some(query_text) = &query.query {
1383                if query_text.is_empty() {
1384                    None
1385                } else {
1386                    let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
1387                    embeddings.pop()
1388                }
1389            } else {
1390                None
1391            }
1392        } else {
1393            None
1394        };
1395
1396        let namespace_pattern = format!("{}%", query.namespace_prefix);
1397        let mut conditions = vec!["namespace LIKE ?".to_string()];
1398        let mut params_str: Vec<String> = vec![namespace_pattern];
1399
1400        if let Some(ref filter) = query.filter {
1401            let (clause, filter_params) = filter_to_sql_sqlite(filter);
1402            conditions.push(format!("({clause})"));
1403            for p in &filter_params {
1404                params_str.push(sqlite_param_from_value(p));
1405            }
1406        }
1407
1408        let where_clause = conditions.join(" AND ");
1409
1410        // Fetch all matching items for vector similarity computation
1411        let data_sql = format!(
1412            "SELECT namespace, key, value, created_at, updated_at \
1413             FROM store_items WHERE {where_clause} \
1414             ORDER BY namespace, key"
1415        );
1416        let mut data_query = sqlx::query(&data_sql);
1417        for p in &params_str {
1418            data_query = data_query.bind(p.as_str());
1419        }
1420
1421        let rows = data_query
1422            .fetch_all(pool)
1423            .await
1424            .map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
1425
1426        let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
1427
1428        // For items with embeddings, compute similarity scores
1429        for row in rows {
1430            let namespace: String = row
1431                .try_get("namespace")
1432                .map_err(|e| StoreError::Storage(e.to_string()))?;
1433            let key: String = row
1434                .try_get("key")
1435                .map_err(|e| StoreError::Storage(e.to_string()))?;
1436            let value_str: String = row
1437                .try_get("value")
1438                .map_err(|e| StoreError::Storage(e.to_string()))?;
1439            let value = serde_json::from_str(&value_str).map_err(StoreError::Serialize)?;
1440            let created_at_str: String = row
1441                .try_get("created_at")
1442                .map_err(|e| StoreError::Storage(e.to_string()))?;
1443            let updated_at_str: String = row
1444                .try_get("updated_at")
1445                .map_err(|e| StoreError::Storage(e.to_string()))?;
1446
1447            // Load embedding for this item
1448            let embedding = if query_embedding.is_some() {
1449                let vector_row = sqlx::query(
1450                    "SELECT vector FROM store_vectors WHERE namespace = ? AND key = ? LIMIT 1",
1451                )
1452                .bind(&namespace)
1453                .bind(&key)
1454                .fetch_optional(pool)
1455                .await
1456                .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1457
1458                if let Some(vrow) = vector_row {
1459                    let bytes: Vec<u8> = vrow
1460                        .try_get("vector")
1461                        .map_err(|e| StoreError::Storage(e.to_string()))?;
1462                    Some(blob_to_vector(&bytes)?)
1463                } else {
1464                    None
1465                }
1466            } else {
1467                None
1468            };
1469
1470            // Compute similarity score if both query and item embeddings exist
1471            let score = query_embedding.as_ref().and_then(|q_emb| {
1472                embedding
1473                    .as_ref()
1474                    .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
1475            });
1476
1477            items.push(SearchItem {
1478                item: Item {
1479                    namespace,
1480                    key,
1481                    value,
1482                    created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
1483                        .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1484                        .with_timezone(&chrono::Utc),
1485                    updated_at: chrono::DateTime::parse_from_rfc3339(&updated_at_str)
1486                        .map_err(|e| StoreError::Storage(format!("invalid timestamp: {e}")))?
1487                        .with_timezone(&chrono::Utc),
1488                    expires_at: None,
1489                    embedding,
1490                },
1491                score,
1492            });
1493        }
1494
1495        let total_count = items.len();
1496
1497        // Sort by similarity when vector search is active
1498        if query_embedding.is_some() {
1499            items.sort_by(|a, b| {
1500                b.score
1501                    .partial_cmp(&a.score)
1502                    .unwrap_or(std::cmp::Ordering::Equal)
1503            });
1504        }
1505
1506        // Apply pagination
1507        let start = query.offset.min(items.len());
1508        let end = (start + query.limit).min(items.len());
1509        let page = items.drain(start..end).collect();
1510
1511        Ok(SearchResult {
1512            items: page,
1513            total_count,
1514        })
1515    }
1516
1517    async fn list_namespaces(
1518        &self,
1519        prefix: Option<&str>,
1520        suffix: Option<&str>,
1521        max_depth: Option<usize>,
1522        limit: Option<usize>,
1523        offset: Option<usize>,
1524    ) -> Result<Vec<String>, StoreError> {
1525        let pool = self
1526            .pool
1527            .as_ref()
1528            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1529
1530        let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
1531        let mut params = Vec::new();
1532
1533        if let Some(prefix_filter) = prefix {
1534            query_str.push_str(" AND namespace LIKE ?");
1535            params.push(format!("{prefix_filter}%"));
1536        }
1537        if let Some(suffix_filter) = suffix {
1538            query_str.push_str(" AND namespace LIKE ?");
1539            params.push(format!("%{suffix_filter}"));
1540        }
1541        if let Some(limit_value) = limit {
1542            let _ = write!(query_str, " LIMIT {limit_value}");
1543        }
1544        if let Some(offset_value) = offset {
1545            let _ = write!(query_str, " OFFSET {offset_value}");
1546        }
1547
1548        let mut query = sqlx::query(&query_str);
1549        for param in params {
1550            query = query.bind(param);
1551        }
1552
1553        let rows = query
1554            .fetch_all(pool)
1555            .await
1556            .map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
1557
1558        let mut namespaces = Vec::new();
1559        for row in rows {
1560            let ns: String = row
1561                .try_get("namespace")
1562                .map_err(|e| StoreError::Storage(e.to_string()))?;
1563            namespaces.push(ns);
1564        }
1565
1566        // Apply max_depth: truncate namespace paths to the first N segments
1567        if let Some(depth) = max_depth {
1568            namespaces = namespaces
1569                .into_iter()
1570                .map(|ns| {
1571                    let parts: Vec<&str> = ns.split('/').take(depth).collect();
1572                    parts.join("/")
1573                })
1574                .collect();
1575            namespaces.sort();
1576            namespaces.dedup();
1577        }
1578
1579        Ok(namespaces)
1580    }
1581
1582    async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
1583        let mut results = Vec::with_capacity(ops.len());
1584
1585        for op in ops {
1586            let result = match op {
1587                StoreOp::Get { namespace, key } => {
1588                    let item = self.get(&namespace, &key).await?;
1589                    StoreResult::Item(item)
1590                }
1591                StoreOp::Put {
1592                    namespace,
1593                    key,
1594                    value,
1595                    index,
1596                } => {
1597                    self.put(&namespace, &key, value, index).await?;
1598                    StoreResult::None
1599                }
1600                StoreOp::Delete { namespace, key } => {
1601                    self.delete(&namespace, &key).await?;
1602                    StoreResult::None
1603                }
1604                StoreOp::Search(query) => {
1605                    let result = self.search(query).await?;
1606                    StoreResult::Items(result)
1607                }
1608                StoreOp::ListNamespaces {
1609                    prefix,
1610                    suffix,
1611                    max_depth,
1612                    limit,
1613                } => {
1614                    let namespaces = self
1615                        .list_namespaces(
1616                            prefix.as_deref(),
1617                            suffix.as_deref(),
1618                            max_depth,
1619                            limit,
1620                            None,
1621                        )
1622                        .await?;
1623                    StoreResult::Namespaces(namespaces)
1624                }
1625            };
1626            results.push(result);
1627        }
1628
1629        Ok(results)
1630    }
1631}
1632
1633/// `PostgreSQL`-based store implementation
1634///
1635/// Provides persistent storage using `PostgreSQL` database.
1636/// Requires the `postgres` feature flag.
1637#[cfg(feature = "postgres")]
1638#[derive(Debug)]
1639pub struct PostgresStore {
1640    /// Database connection pool
1641    pool: Option<sqlx::PgPool>,
1642    /// Vector index configuration
1643    index_config: Option<IndexConfig>,
1644}
1645
1646#[cfg(feature = "postgres")]
1647impl PostgresStore {
1648    /// Create new `PostgreSQL` store
1649    ///
1650    /// # Arguments
1651    ///
1652    /// * `database_url` - `PostgreSQL` database connection string
1653    ///
1654    /// # Errors
1655    ///
1656    /// Returns `StoreError` if connection fails or table creation fails.
1657    pub async fn new(database_url: &str) -> Result<Self, StoreError> {
1658        let pool = sqlx::PgPool::connect(database_url)
1659            .await
1660            .map_err(|e| StoreError::Storage(format!("Failed to connect to database: {e}")))?;
1661
1662        // Run migrations
1663        sqlx::query(
1664            r"
1665            CREATE TABLE IF NOT EXISTS store_items (
1666                namespace TEXT NOT NULL,
1667                key TEXT NOT NULL,
1668                value JSONB NOT NULL,
1669                created_at TIMESTAMPTZ NOT NULL,
1670                updated_at TIMESTAMPTZ NOT NULL,
1671                PRIMARY KEY (namespace, key)
1672            )
1673            ",
1674        )
1675        .execute(&pool)
1676        .await
1677        .map_err(|e| StoreError::Storage(format!("Failed to create table: {e}")))?;
1678
1679        // Create store_vectors table for vector search support
1680        // Note: For production use with pgvector, the vector column should use the VECTOR type
1681        sqlx::query(
1682            r"
1683            CREATE TABLE IF NOT EXISTS store_vectors (
1684                namespace TEXT NOT NULL,
1685                key TEXT NOT NULL,
1686                field TEXT NOT NULL,
1687                vector BYTEA NOT NULL,
1688                PRIMARY KEY (namespace, key, field),
1689                FOREIGN KEY (namespace, key) REFERENCES store_items(namespace, key) ON DELETE CASCADE
1690            )
1691            ",
1692        )
1693        .execute(&pool)
1694        .await
1695        .map_err(|e| StoreError::Storage(format!("Failed to create vectors table: {e}")))?;
1696
1697        Ok(Self {
1698            pool: Some(pool),
1699            index_config: None,
1700        })
1701    }
1702
1703    /// Create new `PostgresStore` with vector search enabled.
1704    ///
1705    /// # Arguments
1706    ///
1707    /// * `database_url` - `PostgreSQL` database connection string
1708    /// * `config` - [`IndexConfig`] for vector search
1709    ///
1710    /// # Errors
1711    ///
1712    /// Returns [`StoreError`] if connection fails or table creation fails.
1713    pub async fn with_vector_search(
1714        database_url: &str,
1715        config: IndexConfig,
1716    ) -> Result<Self, StoreError> {
1717        let mut store = Self::new(database_url).await?;
1718        store.index_config = Some(config);
1719        Ok(store)
1720    }
1721}
1722
1723/// Serialize a vector of `f32` to a `BYTEA` for `PostgreSQL` storage.
1724///
1725/// Uses little-endian byte order for cross-platform consistency.
1726#[cfg(feature = "postgres")]
1727fn vector_to_bytea(vec: &[f32]) -> Vec<u8> {
1728    let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
1729    for &val in vec {
1730        bytes.extend_from_slice(&val.to_le_bytes());
1731    }
1732    bytes
1733}
1734
1735/// Deserialize a `BYTEA` from `PostgreSQL` to a vector of `f32`.
1736///
1737/// Expects little-endian byte order.
1738#[cfg(feature = "postgres")]
1739fn bytea_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
1740    if bytes.len() % std::mem::size_of::<f32>() != 0 {
1741        return Err(StoreError::VectorSearch(
1742            "Invalid BYTEA length for vector data".to_string(),
1743        ));
1744    }
1745    let vec = bytes
1746        .chunks_exact(std::mem::size_of::<f32>())
1747        .map(|chunk| {
1748            let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
1749            f32::from_le_bytes(arr)
1750        })
1751        .collect();
1752    Ok(vec)
1753}
1754
1755/// Serialize a vector of `f32` to a `BLOB` for `SQLite` storage.
1756///
1757/// Uses little-endian byte order for cross-platform consistency.
1758#[cfg(feature = "sqlite")]
1759fn vector_to_blob(vec: &[f32]) -> Vec<u8> {
1760    let mut bytes = Vec::with_capacity(vec.len().saturating_mul(std::mem::size_of::<f32>()));
1761    for &val in vec {
1762        bytes.extend_from_slice(&val.to_le_bytes());
1763    }
1764    bytes
1765}
1766
1767/// Deserialize a `BLOB` from `SQLite` to a vector of `f32`.
1768///
1769/// Expects little-endian byte order.
1770#[cfg(feature = "sqlite")]
1771fn blob_to_vector(bytes: &[u8]) -> Result<Vec<f32>, StoreError> {
1772    if bytes.len() % std::mem::size_of::<f32>() != 0 {
1773        return Err(StoreError::VectorSearch(
1774            "Invalid BLOB length for vector data".to_string(),
1775        ));
1776    }
1777    let vec = bytes
1778        .chunks_exact(std::mem::size_of::<f32>())
1779        .map(|chunk| {
1780            let arr: [u8; 4] = chunk.try_into().expect("chunk is exactly 4 bytes");
1781            f32::from_le_bytes(arr)
1782        })
1783        .collect();
1784    Ok(vec)
1785}
1786
1787/// Convert a [`FilterExpr`] to a Postgres WHERE clause fragment with bind parameters.
1788///
1789/// Returns a tuple of (SQL clause string, bind parameter values).
1790/// Each `?` placeholder in the returned SQL must be renumbered to `$1`, `$2`, etc.
1791/// according to the Postgres parameter numbering scheme.
1792#[cfg(feature = "postgres")]
1793fn filter_to_sql_postgres(filter: &FilterExpr) -> (String, Vec<serde_json::Value>) {
1794    match filter {
1795        FilterExpr::Eq { field, value } => {
1796            let path = field.split('.').collect::<Vec<_>>().join(",");
1797            (format!("value #>> '{{{path}}}' = ?"), vec![value.clone()])
1798        }
1799        FilterExpr::Ne { field, value } => {
1800            let path = field.split('.').collect::<Vec<_>>().join(",");
1801            (format!("value #>> '{{{path}}}' != ?"), vec![value.clone()])
1802        }
1803        FilterExpr::Gt { field, value } => {
1804            let path = field.split('.').collect::<Vec<_>>().join(",");
1805            (
1806                format!("(value #> '{{{path}}}')::numeric > CAST(? AS numeric)"),
1807                vec![value.clone()],
1808            )
1809        }
1810        FilterExpr::Gte { field, value } => {
1811            let path = field.split('.').collect::<Vec<_>>().join(",");
1812            (
1813                format!("(value #> '{{{path}}}')::numeric >= CAST(? AS numeric)"),
1814                vec![value.clone()],
1815            )
1816        }
1817        FilterExpr::Lt { field, value } => {
1818            let path = field.split('.').collect::<Vec<_>>().join(",");
1819            (
1820                format!("(value #> '{{{path}}}')::numeric < CAST(? AS numeric)"),
1821                vec![value.clone()],
1822            )
1823        }
1824        FilterExpr::Lte { field, value } => {
1825            let path = field.split('.').collect::<Vec<_>>().join(",");
1826            (
1827                format!("(value #> '{{{path}}}')::numeric <= CAST(? AS numeric)"),
1828                vec![value.clone()],
1829            )
1830        }
1831        FilterExpr::And { expressions } => {
1832            let mut clauses = Vec::with_capacity(expressions.len());
1833            let mut all_params = Vec::new();
1834            for expr in expressions {
1835                let (clause, params) = filter_to_sql_postgres(expr);
1836                clauses.push(format!("({clause})"));
1837                all_params.extend(params);
1838            }
1839            (clauses.join(" AND "), all_params)
1840        }
1841        FilterExpr::Or { expressions } => {
1842            let mut clauses = Vec::with_capacity(expressions.len());
1843            let mut all_params = Vec::new();
1844            for expr in expressions {
1845                let (clause, params) = filter_to_sql_postgres(expr);
1846                clauses.push(format!("({clause})"));
1847                all_params.extend(params);
1848            }
1849            (clauses.join(" OR "), all_params)
1850        }
1851        FilterExpr::Not { expr } => {
1852            let (clause, params) = filter_to_sql_postgres(expr);
1853            (format!("NOT ({clause})"), params)
1854        }
1855    }
1856}
1857
1858#[cfg(feature = "postgres")]
1859#[async_trait]
1860impl Store for PostgresStore {
1861    async fn get(&self, namespace: &str, key: &str) -> Result<Option<Item>, StoreError> {
1862        let pool = self
1863            .pool
1864            .as_ref()
1865            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1866
1867        let result = sqlx::query(
1868            "SELECT value, created_at, updated_at FROM store_items WHERE namespace = $1 AND key = $2"
1869        )
1870        .bind(namespace)
1871        .bind(key)
1872        .fetch_optional(pool)
1873        .await
1874        .map_err(|e| StoreError::Storage(format!("Failed to get item: {e}")))?;
1875
1876        if let Some(row) = result {
1877            let value: serde_json::Value = row
1878                .try_get("value")
1879                .map_err(|e| StoreError::Storage(e.to_string()))?;
1880            let created_at: chrono::DateTime<chrono::Utc> = row
1881                .try_get("created_at")
1882                .map_err(|e| StoreError::Storage(e.to_string()))?;
1883            let updated_at: chrono::DateTime<chrono::Utc> = row
1884                .try_get("updated_at")
1885                .map_err(|e| StoreError::Storage(e.to_string()))?;
1886
1887            // Load embedding if exists
1888            let embedding = if self.index_config.is_some() {
1889                let vector_row = sqlx::query(
1890                    "SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
1891                )
1892                .bind(namespace)
1893                .bind(key)
1894                .fetch_optional(pool)
1895                .await
1896                .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
1897
1898                if let Some(vrow) = vector_row {
1899                    let bytes: Vec<u8> = vrow
1900                        .try_get("vector")
1901                        .map_err(|e| StoreError::Storage(e.to_string()))?;
1902                    Some(bytea_to_vector(&bytes)?)
1903                } else {
1904                    None
1905                }
1906            } else {
1907                None
1908            };
1909
1910            Ok(Some(Item {
1911                namespace: namespace.to_string(),
1912                key: key.to_string(),
1913                value,
1914                created_at,
1915                updated_at,
1916                expires_at: None,
1917                embedding,
1918            }))
1919        } else {
1920            Ok(None)
1921        }
1922    }
1923
1924    async fn put(
1925        &self,
1926        namespace: &str,
1927        key: &str,
1928        value: serde_json::Value,
1929        index: Option<Vec<String>>,
1930    ) -> Result<(), StoreError> {
1931        let pool = self
1932            .pool
1933            .as_ref()
1934            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
1935
1936        let now = Utc::now();
1937
1938        // Compute embedding before the database transaction
1939        let embedding = if let Some(ref index_config) = self.index_config {
1940            if let Some(index_fields) = &index {
1941                if index_fields.is_empty() {
1942                    None
1943                } else {
1944                    let text = extract_index_text(&value, index_fields);
1945                    if text.is_empty() {
1946                        None
1947                    } else {
1948                        let mut embeddings = index_config.embed.embed(vec![text]).await?;
1949                        embeddings.pop()
1950                    }
1951                }
1952            } else {
1953                None
1954            }
1955        } else {
1956            None
1957        };
1958
1959        sqlx::query(
1960            r"
1961            INSERT INTO store_items (namespace, key, value, created_at, updated_at)
1962            VALUES ($1, $2, $3, $4, $5)
1963            ON CONFLICT (namespace, key) DO UPDATE SET
1964                value = EXCLUDED.value,
1965                updated_at = EXCLUDED.updated_at
1966            ",
1967        )
1968        .bind(namespace)
1969        .bind(key)
1970        .bind(&value)
1971        .bind(now)
1972        .bind(now)
1973        .execute(pool)
1974        .await
1975        .map_err(|e| StoreError::Storage(format!("Failed to put item: {e}")))?;
1976
1977        // Store embedding if configured
1978        if let Some(vec) = embedding {
1979            let bytes = vector_to_bytea(&vec);
1980            sqlx::query(
1981                r"
1982                INSERT INTO store_vectors (namespace, key, field, vector)
1983                VALUES ($1, $2, 'default', $3)
1984                ON CONFLICT (namespace, key, field) DO UPDATE SET
1985                    vector = EXCLUDED.vector
1986                ",
1987            )
1988            .bind(namespace)
1989            .bind(key)
1990            .bind(&bytes)
1991            .execute(pool)
1992            .await
1993            .map_err(|e| StoreError::Storage(format!("Failed to store embedding: {e}")))?;
1994        }
1995
1996        Ok(())
1997    }
1998
1999    async fn delete(&self, namespace: &str, key: &str) -> Result<(), StoreError> {
2000        let pool = self
2001            .pool
2002            .as_ref()
2003            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2004
2005        sqlx::query("DELETE FROM store_items WHERE namespace = $1 AND key = $2")
2006            .bind(namespace)
2007            .bind(key)
2008            .execute(pool)
2009            .await
2010            .map_err(|e| StoreError::Storage(format!("Failed to delete item: {e}")))?;
2011
2012        Ok(())
2013    }
2014
2015    #[allow(
2016        clippy::cast_possible_truncation,
2017        clippy::cast_possible_wrap,
2018        clippy::cast_sign_loss,
2019        clippy::as_conversions,
2020        clippy::similar_names,
2021        clippy::too_many_lines,
2022        reason = "SQL binding requires i64 for LIMIT/OFFSET; COUNT returns i64; names 'nlike' and 'nprefix' are adequately descriptive; vector search logic cannot be further decomposed without extracting trivial helpers"
2023    )]
2024    async fn search(&self, query: SearchQuery) -> Result<SearchResult, StoreError> {
2025        let pool = self
2026            .pool
2027            .as_ref()
2028            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2029
2030        // Compute query embedding if vector search is enabled and query text is provided
2031        let query_embedding: Option<Vec<f32>> = if let Some(ref index_config) = self.index_config {
2032            if let Some(query_text) = &query.query {
2033                if query_text.is_empty() {
2034                    None
2035                } else {
2036                    let mut embeddings = index_config.embed.embed(vec![query_text.clone()]).await?;
2037                    embeddings.pop()
2038                }
2039            } else {
2040                None
2041            }
2042        } else {
2043            None
2044        };
2045
2046        let namespace_pattern = format!("{}%", query.namespace_prefix);
2047        let mut conditions = vec!["namespace LIKE $1".to_string()];
2048        let mut bind_params: Vec<String> = vec![namespace_pattern];
2049        let mut param_idx = 2;
2050
2051        if let Some(ref filter) = query.filter {
2052            let (clause, filter_params) = filter_to_sql_postgres(filter);
2053            // Renumber ? placeholders to $2, $3, ... for Postgres
2054            let mut numbered_clause = String::with_capacity(clause.len());
2055            for c in clause.chars() {
2056                if c == '?' {
2057                    let _ = write!(numbered_clause, "${param_idx}");
2058                    param_idx += 1;
2059                } else {
2060                    numbered_clause.push(c);
2061                }
2062            }
2063            conditions.push(format!("({numbered_clause})"));
2064            for p in &filter_params {
2065                bind_params.push(p.to_string());
2066            }
2067        }
2068
2069        let where_clause = conditions.join(" AND ");
2070
2071        // Fetch all matching items for vector similarity computation
2072        let data_sql = format!(
2073            "SELECT namespace, key, value, created_at, updated_at \
2074             FROM store_items WHERE {where_clause} \
2075             ORDER BY namespace, key"
2076        );
2077        let mut data_query = sqlx::query(&data_sql);
2078        for p in &bind_params {
2079            data_query = data_query.bind(p.as_str());
2080        }
2081
2082        let rows = data_query
2083            .fetch_all(pool)
2084            .await
2085            .map_err(|e| StoreError::Storage(format!("Search query failed: {e}")))?;
2086
2087        let mut items: Vec<SearchItem> = Vec::with_capacity(rows.len());
2088
2089        // For items with embeddings, compute similarity scores
2090        for row in rows {
2091            let namespace: String = row
2092                .try_get("namespace")
2093                .map_err(|e| StoreError::Storage(e.to_string()))?;
2094            let key: String = row
2095                .try_get("key")
2096                .map_err(|e| StoreError::Storage(e.to_string()))?;
2097            let value: serde_json::Value = row
2098                .try_get("value")
2099                .map_err(|e| StoreError::Storage(e.to_string()))?;
2100            let created_at: chrono::DateTime<chrono::Utc> = row
2101                .try_get("created_at")
2102                .map_err(|e| StoreError::Storage(e.to_string()))?;
2103            let updated_at: chrono::DateTime<chrono::Utc> = row
2104                .try_get("updated_at")
2105                .map_err(|e| StoreError::Storage(e.to_string()))?;
2106
2107            // Load embedding for this item
2108            let embedding = if query_embedding.is_some() {
2109                let vector_row = sqlx::query(
2110                    "SELECT vector FROM store_vectors WHERE namespace = $1 AND key = $2 LIMIT 1",
2111                )
2112                .bind(&namespace)
2113                .bind(&key)
2114                .fetch_optional(pool)
2115                .await
2116                .map_err(|e| StoreError::Storage(format!("Failed to load embedding: {e}")))?;
2117
2118                if let Some(vrow) = vector_row {
2119                    let bytes: Vec<u8> = vrow
2120                        .try_get("vector")
2121                        .map_err(|e| StoreError::Storage(e.to_string()))?;
2122                    Some(bytea_to_vector(&bytes)?)
2123                } else {
2124                    None
2125                }
2126            } else {
2127                None
2128            };
2129
2130            // Compute similarity score if both query and item embeddings exist
2131            let score = query_embedding.as_ref().and_then(|q_emb| {
2132                embedding
2133                    .as_ref()
2134                    .map(|i_emb| f64::from(cosine_similarity(q_emb, i_emb)))
2135            });
2136
2137            items.push(SearchItem {
2138                item: Item {
2139                    namespace,
2140                    key,
2141                    value,
2142                    created_at,
2143                    updated_at,
2144                    expires_at: None,
2145                    embedding,
2146                },
2147                score,
2148            });
2149        }
2150
2151        let total_count = items.len();
2152
2153        // Sort by similarity when vector search is active
2154        if query_embedding.is_some() {
2155            items.sort_by(|a, b| {
2156                b.score
2157                    .partial_cmp(&a.score)
2158                    .unwrap_or(std::cmp::Ordering::Equal)
2159            });
2160        }
2161
2162        // Apply pagination
2163        let start = query.offset.min(items.len());
2164        let end = (start + query.limit).min(items.len());
2165        let page = items.drain(start..end).collect();
2166
2167        Ok(SearchResult {
2168            items: page,
2169            total_count,
2170        })
2171    }
2172
2173    async fn list_namespaces(
2174        &self,
2175        prefix: Option<&str>,
2176        suffix: Option<&str>,
2177        max_depth: Option<usize>,
2178        limit: Option<usize>,
2179        offset: Option<usize>,
2180    ) -> Result<Vec<String>, StoreError> {
2181        let pool = self
2182            .pool
2183            .as_ref()
2184            .ok_or_else(|| StoreError::Storage("Store not initialized".to_string()))?;
2185
2186        let mut query_str = "SELECT DISTINCT namespace FROM store_items WHERE 1=1".to_string();
2187        let mut param_idx = 1;
2188        let mut params = Vec::new();
2189
2190        if let Some(prefix_filter) = prefix {
2191            param_idx += 1;
2192            let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
2193            params.push(format!("{prefix_filter}%"));
2194        }
2195        if let Some(suffix_filter) = suffix {
2196            param_idx += 1;
2197            let _ = write!(query_str, " AND namespace LIKE ${param_idx}");
2198            params.push(format!("%{suffix_filter}"));
2199        }
2200        if let Some(limit_value) = limit {
2201            let _ = write!(query_str, " LIMIT {limit_value}");
2202        }
2203        if let Some(offset_value) = offset {
2204            let _ = write!(query_str, " OFFSET {offset_value}");
2205        }
2206
2207        let mut query = sqlx::query(&query_str);
2208        for param in params {
2209            query = query.bind(param);
2210        }
2211
2212        let rows = query
2213            .fetch_all(pool)
2214            .await
2215            .map_err(|e| StoreError::Storage(format!("Failed to list namespaces: {e}")))?;
2216
2217        let mut namespaces = Vec::new();
2218        for row in rows {
2219            let ns: String = row
2220                .try_get("namespace")
2221                .map_err(|e| StoreError::Storage(e.to_string()))?;
2222            namespaces.push(ns);
2223        }
2224
2225        // Apply max_depth: truncate namespace paths to the first N segments
2226        if let Some(depth) = max_depth {
2227            namespaces = namespaces
2228                .into_iter()
2229                .map(|ns| {
2230                    let parts: Vec<&str> = ns.split('/').take(depth).collect();
2231                    parts.join("/")
2232                })
2233                .collect();
2234            namespaces.sort();
2235            namespaces.dedup();
2236        }
2237
2238        Ok(namespaces)
2239    }
2240
2241    async fn batch(&self, ops: Vec<StoreOp>) -> Result<Vec<StoreResult>, StoreError> {
2242        let mut results = Vec::with_capacity(ops.len());
2243
2244        for op in ops {
2245            let result = match op {
2246                StoreOp::Get { namespace, key } => {
2247                    let item = self.get(&namespace, &key).await?;
2248                    StoreResult::Item(item)
2249                }
2250                StoreOp::Put {
2251                    namespace,
2252                    key,
2253                    value,
2254                    index,
2255                } => {
2256                    self.put(&namespace, &key, value, index).await?;
2257                    StoreResult::None
2258                }
2259                StoreOp::Delete { namespace, key } => {
2260                    self.delete(&namespace, &key).await?;
2261                    StoreResult::None
2262                }
2263                StoreOp::Search(query) => {
2264                    let result = self.search(query).await?;
2265                    StoreResult::Items(result)
2266                }
2267                StoreOp::ListNamespaces {
2268                    prefix,
2269                    suffix,
2270                    max_depth,
2271                    limit,
2272                } => {
2273                    let namespaces = self
2274                        .list_namespaces(
2275                            prefix.as_deref(),
2276                            suffix.as_deref(),
2277                            max_depth,
2278                            limit,
2279                            None,
2280                        )
2281                        .await?;
2282                    StoreResult::Namespaces(namespaces)
2283                }
2284            };
2285            results.push(result);
2286        }
2287
2288        Ok(results)
2289    }
2290}
2291
2292// Rust guideline compliant 2026-05-22
2293
2294#[cfg(test)]
2295mod tests {
2296    use super::*;
2297    use serde_json::json;
2298
2299    fn active_value() -> serde_json::Value {
2300        json!({ "status": "active" })
2301    }
2302
2303    fn inactive_value() -> serde_json::Value {
2304        json!({ "status": "inactive" })
2305    }
2306
2307    #[test]
2308    fn test_filter_not_negates_match() {
2309        // Not(Eq{status=active}) on {status=inactive} => true
2310        let filter = FilterExpr::Not {
2311            expr: Box::new(FilterExpr::Eq {
2312                field: "status".to_string(),
2313                value: json!("active"),
2314            }),
2315        };
2316        assert!(evaluate_filter(&filter, &inactive_value()));
2317    }
2318
2319    #[test]
2320    fn test_filter_not_inverts_true_to_false() {
2321        // Not(Eq{status=active}) on {status=active} => false
2322        let filter = FilterExpr::Not {
2323            expr: Box::new(FilterExpr::Eq {
2324                field: "status".to_string(),
2325                value: json!("active"),
2326            }),
2327        };
2328        assert!(!evaluate_filter(&filter, &active_value()));
2329    }
2330
2331    #[test]
2332    fn test_filter_not_combined_with_and() {
2333        // And([Gte{age>=18}, Not(Eq{status=banned})]) on {age:25, status:active} => true
2334        let value = json!({ "age": 25, "status": "active" });
2335        let filter = FilterExpr::And {
2336            expressions: vec![
2337                FilterExpr::Gte {
2338                    field: "age".to_string(),
2339                    value: json!(18),
2340                },
2341                FilterExpr::Not {
2342                    expr: Box::new(FilterExpr::Eq {
2343                        field: "status".to_string(),
2344                        value: json!("banned"),
2345                    }),
2346                },
2347            ],
2348        };
2349        assert!(evaluate_filter(&filter, &value));
2350
2351        // And([Gte{age>=18}, Not(Eq{status=banned})]) on {age:25, status:banned} => false
2352        let banned_value = json!({ "age": 25, "status": "banned" });
2353        assert!(!evaluate_filter(&filter, &banned_value));
2354
2355        // And([Gte{age>=18}, Not(Eq{status=banned})]) on {age:17, status:active} => false
2356        let young_value = json!({ "age": 17, "status": "active" });
2357        assert!(!evaluate_filter(&filter, &young_value));
2358    }
2359
2360    #[test]
2361    fn test_filter_not_serialization_roundtrip() {
2362        let filter = FilterExpr::Not {
2363            expr: Box::new(FilterExpr::Eq {
2364                field: "status".to_string(),
2365                value: json!("active"),
2366            }),
2367        };
2368
2369        let serialized = serde_json::to_string(&filter).expect("serialization failed");
2370        assert!(
2371            serialized.contains("\"$not\""),
2372            "serialized form must contain $not tag"
2373        );
2374
2375        let deserialized: FilterExpr =
2376            serde_json::from_str(&serialized).expect("deserialization failed");
2377
2378        // Verify roundtrip correctness by evaluating both against the same value
2379        let value = active_value();
2380        assert_eq!(
2381            evaluate_filter(&filter, &value),
2382            evaluate_filter(&deserialized, &value),
2383            "roundtrip filter must produce the same result"
2384        );
2385    }
2386
2387    #[test]
2388    fn test_filter_nested_not() {
2389        // Not(Not(Eq{status=active})) is equivalent to Eq{status=active}
2390        let filter = FilterExpr::Not {
2391            expr: Box::new(FilterExpr::Not {
2392                expr: Box::new(FilterExpr::Eq {
2393                    field: "status".to_string(),
2394                    value: json!("active"),
2395                }),
2396            }),
2397        };
2398        assert!(evaluate_filter(&filter, &active_value()));
2399        assert!(!evaluate_filter(&filter, &inactive_value()));
2400    }
2401
2402    // --- TTL tests ---
2403
2404    #[tokio::test]
2405    async fn test_ttl_expiration_on_get() {
2406        let store = MemoryStore::new().with_ttl_config(TTLConfig {
2407            default_ttl: Some(std::time::Duration::from_millis(50)),
2408            refresh_on_read: false,
2409            ..Default::default()
2410        });
2411
2412        store
2413            .put("ns", "key1", json!({"v": 1}), None)
2414            .await
2415            .expect("put failed");
2416
2417        // Item should be visible immediately
2418        let item = store
2419            .get("ns", "key1")
2420            .await
2421            .expect("get failed")
2422            .expect("item should exist");
2423        assert_eq!(item.key, "key1");
2424
2425        // Wait for TTL to expire
2426        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
2427
2428        // Item should be expired and lazily removed
2429        let result = store.get("ns", "key1").await.expect("get failed");
2430        assert!(result.is_none(), "item should have expired");
2431    }
2432
2433    #[tokio::test]
2434    async fn test_ttl_refresh_on_read() {
2435        let store = MemoryStore::new().with_ttl_config(TTLConfig {
2436            default_ttl: Some(std::time::Duration::from_millis(100)),
2437            refresh_on_read: true,
2438            ..Default::default()
2439        });
2440
2441        store
2442            .put("ns", "key1", json!({"v": 1}), None)
2443            .await
2444            .expect("put failed");
2445
2446        // Read the item multiple times, each read should refresh TTL
2447        for _ in 0..3 {
2448            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2449            let item = store
2450                .get("ns", "key1")
2451                .await
2452                .expect("get failed")
2453                .expect("item should still exist after refresh");
2454            assert_eq!(item.key, "key1");
2455        }
2456
2457        // After 3 reads at 50ms each (~150ms total), the item should still be alive
2458        // because each read reset the TTL (100ms).
2459        let result = store.get("ns", "key1").await.expect("get failed");
2460        assert!(
2461            result.is_some(),
2462            "item should still exist after TTL refreshes"
2463        );
2464
2465        // Now wait longer than the TTL without reading -- it should expire
2466        tokio::time::sleep(std::time::Duration::from_millis(120)).await;
2467        let result = store.get("ns", "key1").await.expect("get failed");
2468        assert!(result.is_none(), "item should have expired after no reads");
2469    }
2470
2471    #[tokio::test]
2472    async fn test_ttl_search_filters_expired() {
2473        let store = MemoryStore::new().with_ttl_config(TTLConfig {
2474            default_ttl: Some(std::time::Duration::from_millis(50)),
2475            refresh_on_read: false,
2476            ..Default::default()
2477        });
2478
2479        store
2480            .put("ns", "key1", json!({"v": 1}), None)
2481            .await
2482            .expect("put failed");
2483        store
2484            .put("ns", "key2", json!({"v": 2}), None)
2485            .await
2486            .expect("put failed");
2487
2488        // Both items should appear in search
2489        let query = SearchQuery {
2490            namespace_prefix: "ns".to_string(),
2491            filter: None,
2492            query: None,
2493            limit: 10,
2494            offset: 0,
2495        };
2496        let result = store.search(query).await.expect("search failed");
2497        assert_eq!(result.total_count, 2);
2498
2499        // Wait for items to expire
2500        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
2501
2502        // Search should return zero items
2503        let query = SearchQuery {
2504            namespace_prefix: "ns".to_string(),
2505            filter: None,
2506            query: None,
2507            limit: 10,
2508            offset: 0,
2509        };
2510        let result = store.search(query).await.expect("search failed");
2511        assert_eq!(
2512            result.total_count, 0,
2513            "expired items should be filtered from search"
2514        );
2515    }
2516
2517    #[tokio::test]
2518    async fn test_no_ttl_items_never_expire() {
2519        let store = MemoryStore::new();
2520
2521        store
2522            .put("ns", "key1", json!({"v": 1}), None)
2523            .await
2524            .expect("put failed");
2525
2526        // Item should have no expiration -- read internal data to verify
2527        let has_no_expiry = {
2528            let data = store.data.read().await;
2529            data.get("ns")
2530                .and_then(|ns| ns.get("key1"))
2531                .is_some_and(|item| item.expires_at.is_none())
2532        };
2533        assert!(has_no_expiry, "item should have no expiration set");
2534
2535        // Even after a delay, item should remain
2536        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2537        let result = store.get("ns", "key1").await.expect("get failed");
2538        assert!(result.is_some(), "item without TTL should never expire");
2539    }
2540
2541    #[tokio::test]
2542    async fn test_ttl_lazy_cleanup_removes_from_underlying_storage() {
2543        let store = MemoryStore::new().with_ttl_config(TTLConfig {
2544            default_ttl: Some(std::time::Duration::from_millis(30)),
2545            refresh_on_read: false,
2546            ..Default::default()
2547        });
2548
2549        store
2550            .put("ns", "key1", json!({"v": 1}), None)
2551            .await
2552            .expect("put failed");
2553
2554        // Verify item is in storage
2555        let exists_before = {
2556            let data = store.data.read().await;
2557            data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
2558        };
2559        assert!(exists_before, "item should exist in storage initially");
2560
2561        // Wait for expiration
2562        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
2563
2564        // Trigger lazy cleanup via get
2565        let _ = store.get("ns", "key1").await;
2566
2567        // Verify item was removed from underlying storage
2568        let exists_after = {
2569            let data = store.data.read().await;
2570            data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
2571        };
2572        assert!(!exists_after, "expired item should be removed from storage");
2573    }
2574
2575    #[tokio::test]
2576    async fn test_ttl_refresh_updates_expires_at() {
2577        let store = MemoryStore::new().with_ttl_config(TTLConfig {
2578            default_ttl: Some(std::time::Duration::from_millis(200)),
2579            refresh_on_read: true,
2580            ..Default::default()
2581        });
2582
2583        store
2584            .put("ns", "key1", json!({"v": 1}), None)
2585            .await
2586            .expect("put failed");
2587
2588        let original_expires = {
2589            let data = store.data.read().await;
2590            data.get("ns")
2591                .and_then(|ns| ns.get("key1"))
2592                .expect("item")
2593                .expires_at
2594                .expect("should have expires_at")
2595        };
2596
2597        // Small delay so updated_at differs
2598        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2599
2600        let _ = store.get("ns", "key1").await;
2601
2602        let refreshed_expires = {
2603            let data = store.data.read().await;
2604            data.get("ns")
2605                .and_then(|ns| ns.get("key1"))
2606                .expect("item")
2607                .expires_at
2608                .expect("should have expires_at")
2609        };
2610
2611        assert!(
2612            refreshed_expires > original_expires,
2613            "refresh_on_read should advance the expiration time: {refreshed_expires} should be > {original_expires}"
2614        );
2615    }
2616
2617    // --- Vector search tests ---
2618
2619    /// Test embedding function for deterministic vector search testing.
2620    ///
2621    /// Produces an 8-dimensional normalized embedding from text using
2622    /// a polynomial hash. Same text always produces the same embedding;
2623    /// different texts produce (with high probability) different embeddings.
2624    struct TestEmbeddingFunc;
2625
2626    #[async_trait::async_trait]
2627    impl EmbeddingFunc for TestEmbeddingFunc {
2628        async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, StoreError> {
2629            Ok(texts
2630                .iter()
2631                .map(|text| {
2632                    // FNV-1a-like hash for deterministic embedding
2633                    let hash: u64 = text.bytes().fold(0xcbf2_9ce4_8422_2325u64, |h, b| {
2634                        (h ^ u64::from(b)).wrapping_mul(0x0100_0000_01b3)
2635                    });
2636                    let mut vec: Vec<f32> = (0..8)
2637                        .map(|i| f32::from(((hash >> (i * 8)) & 0xFF) as u8) / 255.0)
2638                        .collect();
2639                    let norm = vec.iter().map(|x| x * x).sum::<f32>().sqrt();
2640                    if norm > 0.0 {
2641                        for v in &mut vec {
2642                            *v /= norm;
2643                        }
2644                    }
2645                    vec
2646                })
2647                .collect())
2648        }
2649    }
2650
2651    #[test]
2652    fn test_cosine_similarity_identical_vectors() {
2653        let v = vec![1.0, 0.0, 0.0];
2654        let sim = cosine_similarity(&v, &v);
2655        let expected = 1.0;
2656        assert!(
2657            (sim - expected).abs() < f32::EPSILON,
2658            "identical vectors should have similarity 1.0, got {sim}"
2659        );
2660    }
2661
2662    #[test]
2663    fn test_cosine_similarity_orthogonal_vectors() {
2664        let a = vec![1.0, 0.0, 0.0];
2665        let b = vec![0.0, 1.0, 0.0];
2666        let sim = cosine_similarity(&a, &b);
2667        let expected = 0.0;
2668        assert!(
2669            (sim - expected).abs() < f32::EPSILON,
2670            "orthogonal vectors should have similarity 0.0, got {sim}"
2671        );
2672    }
2673
2674    #[test]
2675    fn test_cosine_similarity_opposite_vectors() {
2676        let a = vec![1.0, 0.0];
2677        let b = vec![-1.0, 0.0];
2678        let sim = cosine_similarity(&a, &b);
2679        let expected = -1.0;
2680        assert!(
2681            (sim - expected).abs() < f32::EPSILON,
2682            "opposite vectors should have similarity -1.0, got {sim}"
2683        );
2684    }
2685
2686    #[test]
2687    fn test_cosine_similarity_zero_norm() {
2688        let a = vec![0.0, 0.0, 0.0];
2689        let b = vec![1.0, 0.0, 0.0];
2690        let sim = cosine_similarity(&a, &b);
2691        let expected = 0.0;
2692        assert!(
2693            (sim - expected).abs() < f32::EPSILON,
2694            "zero-norm vector should give similarity 0.0, got {sim}"
2695        );
2696    }
2697
2698    #[tokio::test]
2699    async fn test_search_with_embeddings_returns_scored_results() {
2700        let index_config = IndexConfig {
2701            dims: 8,
2702            embed: Box::new(TestEmbeddingFunc),
2703            fields: Some(vec!["text".to_string()]),
2704        };
2705        let store = MemoryStore::new().with_vector_search(index_config);
2706
2707        // Put items with index fields
2708        store
2709            .put(
2710                "docs",
2711                "item1",
2712                json!({"text": "hello world"}),
2713                Some(vec!["text".to_string()]),
2714            )
2715            .await
2716            .expect("put failed");
2717        store
2718            .put(
2719                "docs",
2720                "item2",
2721                json!({"text": "quantum physics"}),
2722                Some(vec!["text".to_string()]),
2723            )
2724            .await
2725            .expect("put failed");
2726
2727        // Search with a query text
2728        let query = SearchQuery {
2729            namespace_prefix: "docs".to_string(),
2730            filter: None,
2731            query: Some("hello world".to_string()),
2732            limit: 10,
2733            offset: 0,
2734        };
2735        let result = store.search(query).await.expect("search failed");
2736
2737        assert!(
2738            !result.items.is_empty(),
2739            "search should return matching items"
2740        );
2741        // All returned items should have scores since they have embeddings
2742        for item in &result.items {
2743            assert!(
2744                item.score.is_some(),
2745                "items with embeddings should have similarity scores"
2746            );
2747        }
2748
2749        // The most relevant result should have a high score (> 0.9)
2750        if let Some(score) = result.items.first().and_then(|i| i.score) {
2751            assert!(
2752                score > 0.9,
2753                "top result should have high similarity score, got {score}"
2754            );
2755        }
2756    }
2757
2758    #[tokio::test]
2759    async fn test_search_ordering_respects_similarity() {
2760        let index_config = IndexConfig {
2761            dims: 8,
2762            embed: Box::new(TestEmbeddingFunc),
2763            fields: Some(vec!["text".to_string()]),
2764        };
2765        let store = MemoryStore::new().with_vector_search(index_config);
2766
2767        // Put items with clearly different content
2768        store
2769            .put(
2770                "docs",
2771                "hello-world",
2772                json!({"text": "hello world"}),
2773                Some(vec!["text".to_string()]),
2774            )
2775            .await
2776            .expect("put failed");
2777        store
2778            .put(
2779                "docs",
2780                "hello-there",
2781                json!({"text": "hello there"}),
2782                Some(vec!["text".to_string()]),
2783            )
2784            .await
2785            .expect("put failed");
2786        store
2787            .put(
2788                "docs",
2789                "quantum-physics",
2790                json!({"text": "quantum physics"}),
2791                Some(vec!["text".to_string()]),
2792            )
2793            .await
2794            .expect("put failed");
2795
2796        // Search with query matching the first item
2797        let query = SearchQuery {
2798            namespace_prefix: "docs".to_string(),
2799            filter: None,
2800            query: Some("hello world".to_string()),
2801            limit: 10,
2802            offset: 0,
2803        };
2804        let result = store.search(query).await.expect("search failed");
2805
2806        assert_eq!(
2807            result.items.len(),
2808            3,
2809            "should return all 3 items in the namespace"
2810        );
2811
2812        // The most similar item should be first
2813        let first = result
2814            .items
2815            .first()
2816            .expect("should have at least one result");
2817        assert_eq!(
2818            first.item.key, "hello-world",
2819            "the most similar item should be ranked first"
2820        );
2821
2822        // Scores should be in descending order (best match first)
2823        for pair in result.items.windows(2) {
2824            if let (Some(a), Some(b)) = (pair[0].score, pair[1].score) {
2825                assert!(
2826                    a >= b,
2827                    "scores should be in descending order: {a} should be >= {b}"
2828                );
2829            }
2830        }
2831    }
2832
2833    #[tokio::test]
2834    async fn test_search_without_index_returns_no_scores() {
2835        // Store with no vector search configured
2836        let store = MemoryStore::new();
2837
2838        store
2839            .put("docs", "item1", json!({"text": "hello"}), None)
2840            .await
2841            .expect("put failed");
2842        store
2843            .put("docs", "item2", json!({"text": "world"}), None)
2844            .await
2845            .expect("put failed");
2846
2847        // Search with a query should still work but without scores
2848        let query = SearchQuery {
2849            namespace_prefix: "docs".to_string(),
2850            filter: None,
2851            query: Some("hello".to_string()),
2852            limit: 10,
2853            offset: 0,
2854        };
2855        let result = store.search(query).await.expect("search failed");
2856
2857        assert_eq!(result.items.len(), 2, "should return all items");
2858        // All scores should be None since no index is configured
2859        for item in &result.items {
2860            assert!(
2861                item.score.is_none(),
2862                "items without index should have no score"
2863            );
2864        }
2865    }
2866
2867    #[tokio::test]
2868    async fn test_list_namespaces_offset_skips_first_n() {
2869        let store = MemoryStore::new();
2870
2871        // Insert items across multiple namespaces
2872        for i in 0..5 {
2873            store
2874                .put(&format!("ns-{i}"), "key", json!({"v": i}), None)
2875                .await
2876                .expect("put failed");
2877        }
2878
2879        // Without offset, all 5 namespaces are returned
2880        let all_ns = store
2881            .list_namespaces(None, None, None, None, None)
2882            .await
2883            .expect("list_namespaces failed");
2884        assert_eq!(all_ns.len(), 5, "expected all 5 namespaces");
2885
2886        // With offset=2, skip first 2 => 3 remaining
2887        let offset_ns = store
2888            .list_namespaces(None, None, None, None, Some(2))
2889            .await
2890            .expect("list_namespaces with offset failed");
2891        assert_eq!(
2892            offset_ns.len(),
2893            3,
2894            "offset=2 should skip 2 namespaces, leaving 3"
2895        );
2896    }
2897
2898    #[tokio::test]
2899    async fn test_list_namespaces_offset_and_limit_together() {
2900        let store = MemoryStore::new();
2901
2902        for i in 0..10 {
2903            store
2904                .put(&format!("ns-{i:02}"), "key", json!({"v": i}), None)
2905                .await
2906                .expect("put failed");
2907        }
2908
2909        // Offset=3, limit=4 => skip first 3, take next 4 => 4 results
2910        let page = store
2911            .list_namespaces(None, None, None, Some(4), Some(3))
2912            .await
2913            .expect("list_namespaces failed");
2914        assert_eq!(page.len(), 4, "offset=3 + limit=4 should yield 4 results");
2915    }
2916
2917    #[tokio::test]
2918    async fn test_list_namespaces_offset_larger_than_results() {
2919        let store = MemoryStore::new();
2920
2921        store
2922            .put("only-ns", "key", json!({"v": 1}), None)
2923            .await
2924            .expect("put failed");
2925
2926        // offset=100 but only 1 namespace exists => empty result
2927        let result = store
2928            .list_namespaces(None, None, None, None, Some(100))
2929            .await
2930            .expect("list_namespaces failed");
2931        assert!(
2932            result.is_empty(),
2933            "offset larger than result set should return empty"
2934        );
2935    }
2936
2937    #[tokio::test]
2938    async fn test_list_namespaces_offset_with_prefix_filter() {
2939        let store = MemoryStore::new();
2940
2941        for i in 0..6 {
2942            let ns = if i < 3 {
2943                format!("alpha-{i}")
2944            } else {
2945                format!("beta-{i}")
2946            };
2947            store
2948                .put(&ns, "key", json!({"v": i}), None)
2949                .await
2950                .expect("put failed");
2951        }
2952
2953        // Filter to "alpha-" namespaces only, then offset=1 => skip 1 of 3 => 2 remaining
2954        let result = store
2955            .list_namespaces(Some("alpha-"), None, None, None, Some(1))
2956            .await
2957            .expect("list_namespaces failed");
2958        assert_eq!(
2959            result.len(),
2960            2,
2961            "prefix filter + offset=1 should leave 2 namespaces"
2962        );
2963        assert!(
2964            result.iter().all(|ns| ns.starts_with("alpha-")),
2965            "all results must match prefix filter"
2966        );
2967    }
2968
2969    // --- SQLite filter_to_sql tests ---
2970
2971    #[cfg(feature = "sqlite")]
2972    #[test]
2973    fn test_filter_to_sql_eq() {
2974        let filter = FilterExpr::Eq {
2975            field: "status".to_string(),
2976            value: json!("active"),
2977        };
2978        let (sql, params) = filter_to_sql_sqlite(&filter);
2979        assert_eq!(sql, "json_extract(value, '$.status') = ?");
2980        assert_eq!(params.len(), 1);
2981        assert_eq!(sqlite_param_from_value(&params[0]), "active");
2982    }
2983
2984    #[cfg(feature = "sqlite")]
2985    #[test]
2986    fn test_filter_to_sql_gt() {
2987        let filter = FilterExpr::Gt {
2988            field: "age".to_string(),
2989            value: json!(18),
2990        };
2991        let (sql, params) = filter_to_sql_sqlite(&filter);
2992        assert_eq!(
2993            sql,
2994            "CAST(json_extract(value, '$.age') AS REAL) > CAST(? AS REAL)"
2995        );
2996        assert_eq!(params.len(), 1);
2997        assert_eq!(sqlite_param_from_value(&params[0]), "18");
2998    }
2999
3000    #[cfg(feature = "sqlite")]
3001    #[test]
3002    fn test_filter_to_sql_and_or_combination() {
3003        let filter = FilterExpr::And {
3004            expressions: vec![
3005                FilterExpr::Eq {
3006                    field: "status".to_string(),
3007                    value: json!("active"),
3008                },
3009                FilterExpr::Or {
3010                    expressions: vec![
3011                        FilterExpr::Gte {
3012                            field: "age".to_string(),
3013                            value: json!(18),
3014                        },
3015                        FilterExpr::Eq {
3016                            field: "role".to_string(),
3017                            value: json!("admin"),
3018                        },
3019                    ],
3020                },
3021            ],
3022        };
3023        let (sql, params) = filter_to_sql_sqlite(&filter);
3024        assert_eq!(
3025            sql,
3026            "(json_extract(value, '$.status') = ?) AND \
3027             ((CAST(json_extract(value, '$.age') AS REAL) >= CAST(? AS REAL)) OR \
3028             (json_extract(value, '$.role') = ?))"
3029        );
3030        assert_eq!(params.len(), 3);
3031    }
3032
3033    #[cfg(feature = "sqlite")]
3034    #[test]
3035    fn test_filter_to_sql_not() {
3036        let filter = FilterExpr::Not {
3037            expr: Box::new(FilterExpr::Eq {
3038                field: "status".to_string(),
3039                value: json!("banned"),
3040            }),
3041        };
3042        let (sql, params) = filter_to_sql_sqlite(&filter);
3043        assert!(sql.starts_with("NOT ("));
3044        assert!(sql.contains("json_extract(value, '$.status') = ?"));
3045        assert_eq!(params.len(), 1);
3046    }
3047
3048    #[cfg(feature = "sqlite")]
3049    #[test]
3050    fn test_sqlite_param_bool_true() {
3051        assert_eq!(sqlite_param_from_value(&json!(true)), "1");
3052    }
3053
3054    #[cfg(feature = "sqlite")]
3055    #[test]
3056    fn test_sqlite_param_bool_false() {
3057        assert_eq!(sqlite_param_from_value(&json!(false)), "0");
3058    }
3059
3060    #[cfg(feature = "sqlite")]
3061    #[test]
3062    fn test_sqlite_param_string() {
3063        assert_eq!(sqlite_param_from_value(&json!("hello")), "hello");
3064    }
3065
3066    #[cfg(feature = "sqlite")]
3067    #[test]
3068    fn test_sqlite_param_number() {
3069        assert_eq!(sqlite_param_from_value(&json!(42)), "42");
3070        assert_eq!(sqlite_param_from_value(&json!(42.5)), "42.5");
3071    }
3072
3073    // --- Postgres filter_to_sql tests ---
3074
3075    #[cfg(feature = "postgres")]
3076    #[test]
3077    fn test_filter_to_sql_postgres_eq() {
3078        let filter = FilterExpr::Eq {
3079            field: "status".to_string(),
3080            value: json!("active"),
3081        };
3082        let (sql, params) = filter_to_sql_postgres(&filter);
3083        assert_eq!(sql, "value #>> '{status}' = ?");
3084        assert_eq!(params.len(), 1);
3085    }
3086
3087    #[cfg(feature = "postgres")]
3088    #[test]
3089    fn test_filter_to_sql_postgres_nested_field() {
3090        let filter = FilterExpr::Eq {
3091            field: "user.address.city".to_string(),
3092            value: json!("NYC"),
3093        };
3094        let (sql, _params) = filter_to_sql_postgres(&filter);
3095        assert_eq!(sql, "value #>> '{user,address,city}' = ?");
3096    }
3097
3098    #[cfg(feature = "postgres")]
3099    #[test]
3100    fn test_filter_to_sql_postgres_numeric_compare() {
3101        let filter = FilterExpr::Lt {
3102            field: "price".to_string(),
3103            value: json!(100.0),
3104        };
3105        let (sql, _params) = filter_to_sql_postgres(&filter);
3106        assert_eq!(sql, "(value #> '{price}')::numeric < CAST(? AS numeric)");
3107    }
3108
3109    #[cfg(feature = "postgres")]
3110    #[test]
3111    fn test_filter_to_sql_postgres_and_or_not() {
3112        let filter = FilterExpr::And {
3113            expressions: vec![
3114                FilterExpr::Eq {
3115                    field: "a".to_string(),
3116                    value: json!(1),
3117                },
3118                FilterExpr::Not {
3119                    expr: Box::new(FilterExpr::Or {
3120                        expressions: vec![
3121                            FilterExpr::Eq {
3122                                field: "b".to_string(),
3123                                value: json!(2),
3124                            },
3125                            FilterExpr::Eq {
3126                                field: "c".to_string(),
3127                                value: json!(3),
3128                            },
3129                        ],
3130                    }),
3131                },
3132            ],
3133        };
3134        let (sql, params) = filter_to_sql_postgres(&filter);
3135        assert_eq!(params.len(), 3);
3136        assert!(sql.contains("AND"));
3137        assert!(sql.contains("NOT ("));
3138        assert!(sql.contains("OR"));
3139    }
3140
3141    // --- Sweep tests ---
3142
3143    #[tokio::test]
3144    async fn test_sweep_expired_items_removes_expired() {
3145        let store = MemoryStore::new().with_ttl_config(TTLConfig {
3146            default_ttl: Some(std::time::Duration::from_millis(50)),
3147            refresh_on_read: false,
3148            ..Default::default()
3149        });
3150
3151        store
3152            .put("ns", "key1", json!({"v": 1}), None)
3153            .await
3154            .expect("put failed");
3155        store
3156            .put("ns", "key2", json!({"v": 2}), None)
3157            .await
3158            .expect("put failed");
3159
3160        // Wait for items to expire
3161        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3162
3163        // Sweep should remove both expired items
3164        let count = store.sweep_expired_items().await.expect("sweep failed");
3165        assert_eq!(count, 2, "sweep should remove 2 expired items");
3166
3167        // Verify items are gone from storage
3168        let exists = {
3169            let data = store.data.read().await;
3170            data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3171        };
3172        assert!(!exists, "expired item should be removed from storage");
3173
3174        let exists = {
3175            let data = store.data.read().await;
3176            data.get("ns").is_some_and(|ns| ns.contains_key("key2"))
3177        };
3178        assert!(!exists, "expired item should be removed from storage");
3179    }
3180
3181    #[tokio::test]
3182    async fn test_sweep_expired_items_respects_max_items_limit() {
3183        let store = MemoryStore::new().with_ttl_config(TTLConfig {
3184            default_ttl: Some(std::time::Duration::from_millis(50)),
3185            refresh_on_read: false,
3186            sweep_max_items: 2,
3187            ..Default::default()
3188        });
3189
3190        // Insert 5 items
3191        for i in 1..=5 {
3192            store
3193                .put("ns", &format!("key{i}"), json!({"v": i}), None)
3194                .await
3195                .expect("put failed");
3196        }
3197
3198        // Wait for items to expire
3199        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3200
3201        // First sweep should remove at most 2 items
3202        let count1 = store.sweep_expired_items().await.expect("sweep failed");
3203        assert_eq!(
3204            count1, 2,
3205            "first sweep should respect sweep_max_items limit"
3206        );
3207
3208        // Second sweep should remove 2 more
3209        let count2 = store.sweep_expired_items().await.expect("sweep failed");
3210        assert_eq!(count2, 2, "second sweep should remove 2 more items");
3211
3212        // Third sweep should remove the last item
3213        let count3 = store.sweep_expired_items().await.expect("sweep failed");
3214        assert_eq!(count3, 1, "third sweep should remove last item");
3215
3216        // Fourth sweep should remove nothing
3217        let count4 = store.sweep_expired_items().await.expect("sweep_failed");
3218        assert_eq!(count4, 0, "fourth sweep should find no expired items");
3219    }
3220
3221    #[tokio::test]
3222    async fn test_sweep_expired_items_across_multiple_namespaces() {
3223        let store = MemoryStore::new().with_ttl_config(TTLConfig {
3224            default_ttl: Some(std::time::Duration::from_millis(50)),
3225            refresh_on_read: false,
3226            ..Default::default()
3227        });
3228
3229        // Insert items across multiple namespaces
3230        store
3231            .put("ns1", "key1", json!({"v": 1}), None)
3232            .await
3233            .expect("put failed");
3234        store
3235            .put("ns1", "key2", json!({"v": 2}), None)
3236            .await
3237            .expect("put failed");
3238        store
3239            .put("ns2", "key1", json!({"v": 3}), None)
3240            .await
3241            .expect("put failed");
3242        store
3243            .put("ns2", "key2", json!({"v": 4}), None)
3244            .await
3245            .expect("put failed");
3246
3247        // Wait for items to expire
3248        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3249
3250        // Sweep should remove all expired items across all namespaces
3251        let count = store.sweep_expired_items().await.expect("sweep failed");
3252        assert_eq!(count, 4, "sweep should remove all 4 expired items");
3253
3254        // Verify all items are gone
3255        let total_items = {
3256            let data = store.data.read().await;
3257            data.values()
3258                .map(std::collections::HashMap::len)
3259                .sum::<usize>()
3260        };
3261        assert_eq!(total_items, 0, "all items should be removed");
3262    }
3263
3264    #[tokio::test]
3265    async fn test_sweep_expired_items_does_not_remove_non_expired() {
3266        let store = MemoryStore::new().with_ttl_config(TTLConfig {
3267            default_ttl: Some(std::time::Duration::from_secs(10)),
3268            refresh_on_read: false,
3269            ..Default::default()
3270        });
3271
3272        store
3273            .put("ns", "key1", json!({"v": 1}), None)
3274            .await
3275            .expect("put failed");
3276
3277        // Items should not be expired yet
3278        let count = store.sweep_expired_items().await.expect("sweep failed");
3279        assert_eq!(count, 0, "sweep should not remove non-expired items");
3280
3281        // Verify item still exists
3282        let item = store
3283            .get("ns", "key1")
3284            .await
3285            .expect("get failed")
3286            .expect("item should still exist");
3287        assert_eq!(item.key, "key1");
3288    }
3289
3290    #[tokio::test]
3291    async fn test_sweep_expired_items_with_no_ttl_items() {
3292        let store = MemoryStore::new();
3293
3294        store
3295            .put("ns", "key1", json!({"v": 1}), None)
3296            .await
3297            .expect("put failed");
3298
3299        // Items without TTL should never be swept
3300        let count = store.sweep_expired_items().await.expect("sweep failed");
3301        assert_eq!(count, 0, "sweep should not remove items without expiration");
3302
3303        // Verify item still exists
3304        let item = store
3305            .get("ns", "key1")
3306            .await
3307            .expect("get failed")
3308            .expect("item should still exist");
3309        assert_eq!(item.key, "key1");
3310    }
3311
3312    #[tokio::test]
3313    async fn test_start_sweep_task_runs_periodically() {
3314        let store = Arc::new(MemoryStore::new().with_ttl_config(TTLConfig {
3315            default_ttl: Some(std::time::Duration::from_millis(50)),
3316            refresh_on_read: false,
3317            sweep_interval: std::time::Duration::from_millis(100),
3318            ..Default::default()
3319        }));
3320
3321        store
3322            .put("ns", "key1", json!({"v": 1}), None)
3323            .await
3324            .expect("put failed");
3325
3326        // Start sweep task
3327        let store_clone = Arc::clone(&store);
3328        let handle = store_clone.start_sweep_task();
3329
3330        // Wait for item to expire and sweep task to run
3331        tokio::time::sleep(std::time::Duration::from_millis(250)).await;
3332
3333        // Verify item was removed by sweep task
3334        let exists = {
3335            let data = store.data.read().await;
3336            data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3337        };
3338        assert!(!exists, "sweep task should have removed expired item");
3339
3340        // Abort the task to clean up
3341        handle.abort();
3342    }
3343
3344    #[tokio::test]
3345    async fn test_sweep_and_lazy_cleanup_work_together() {
3346        let store = MemoryStore::new().with_ttl_config(TTLConfig {
3347            default_ttl: Some(std::time::Duration::from_millis(50)),
3348            refresh_on_read: false,
3349            ..Default::default()
3350        });
3351
3352        // Insert multiple items
3353        for i in 1..=5 {
3354            store
3355                .put("ns", &format!("key{i}"), json!({"v": i}), None)
3356                .await
3357                .expect("put failed");
3358        }
3359
3360        // Wait for items to expire
3361        tokio::time::sleep(std::time::Duration::from_millis(80)).await;
3362
3363        // Trigger lazy cleanup via get for key1
3364        let _ = store.get("ns", "key1").await;
3365
3366        // Verify key1 was removed by lazy cleanup
3367        let exists1 = {
3368            let data = store.data.read().await;
3369            data.get("ns").is_some_and(|ns| ns.contains_key("key1"))
3370        };
3371        assert!(!exists1, "lazy cleanup should remove key1");
3372
3373        // Now sweep should remove the remaining 4 items
3374        let count = store.sweep_expired_items().await.expect("sweep failed");
3375        assert_eq!(count, 4, "sweep should remove remaining 4 items");
3376
3377        // Verify all items are gone
3378        let total_items = {
3379            let data = store.data.read().await;
3380            data.values()
3381                .map(std::collections::HashMap::len)
3382                .sum::<usize>()
3383        };
3384        assert_eq!(total_items, 0, "all items should be removed");
3385    }
3386}