sync_engine/search/
search_cache.rs

1//! Search Result Cache
2//!
3//! Caches SQL search results with merkle-based invalidation.
4//! When the merkle root for a prefix changes, cached results are automatically stale.
5//!
6//! # Why This Works
7//!
8//! - Merkle root = content hash of all data under prefix
9//! - Any write → new merkle root → cache auto-invalidates
10//! - Repeated identical searches → memory hit
11//! - Bounded by max entries with oldest-eviction
12//!
13//! # Flow
14//!
15//! ```text
16//! Search query arrives
17//!       │
18//!       ▼
19//! ┌─────────────────────────────┐
20//! │  Cache lookup               │
21//! │  key = hash(query + prefix) │
22//! │  check: cached_merkle_root  │
23//! │         == current_root?    │
24//! └─────────────────────────────┘
25//!       │
26//!       ├─→ Hit + root matches → return cached keys
27//!       │
28//!       └─→ Miss OR stale → run SQL, cache results
29//! ```
30
31use dashmap::DashMap;
32use parking_lot::Mutex;
33use std::collections::hash_map::DefaultHasher;
34use std::collections::VecDeque;
35use std::hash::{Hash, Hasher};
36use std::sync::atomic::{AtomicU64, Ordering};
37
38use super::Query;
39
40/// Cache key: (prefix, query_hash)
41type CacheKey = (String, u64);
42
43/// Cached search result entry
44#[derive(Clone, Debug)]
45struct CacheEntry {
46    /// Merkle root at time of caching
47    merkle_root: Vec<u8>,
48    /// Cached result keys
49    keys: Vec<String>,
50}
51
52/// Search result cache with merkle-based invalidation
53pub struct SearchCache {
54    /// Cache: (prefix, query_hash) → (merkle_root, result_keys)
55    cache: DashMap<CacheKey, CacheEntry>,
56    /// Insertion order for eviction (oldest first)
57    order: Mutex<VecDeque<CacheKey>>,
58    /// Maximum number of entries
59    max_entries: usize,
60    /// Cache hits counter
61    hits: AtomicU64,
62    /// Cache misses counter
63    misses: AtomicU64,
64    /// Stale invalidations counter
65    stale: AtomicU64,
66}
67
68/// Cache statistics
69#[derive(Debug, Clone)]
70pub struct SearchCacheStats {
71    /// Number of cache hits
72    pub hits: u64,
73    /// Number of cache misses
74    pub misses: u64,
75    /// Number of stale entries (merkle root changed)
76    pub stale: u64,
77    /// Current number of entries
78    pub entry_count: usize,
79    /// Hit rate (0.0 - 1.0)
80    pub hit_rate: f64,
81}
82
83impl SearchCache {
84    /// Create a new search cache with the given max entries
85    pub fn new(max_entries: usize) -> Self {
86        Self {
87            cache: DashMap::new(),
88            order: Mutex::new(VecDeque::new()),
89            max_entries,
90            hits: AtomicU64::new(0),
91            misses: AtomicU64::new(0),
92            stale: AtomicU64::new(0),
93        }
94    }
95
96    /// Try to get cached results for a query
97    ///
98    /// Returns `Some(keys)` if cache hit and merkle root matches,
99    /// `None` if miss or stale.
100    pub fn get(&self, prefix: &str, query: &Query, current_merkle_root: &[u8]) -> Option<Vec<String>> {
101        let key = self.cache_key(prefix, query);
102
103        if let Some(entry) = self.cache.get(&key) {
104            if entry.merkle_root == current_merkle_root {
105                self.hits.fetch_add(1, Ordering::Relaxed);
106                return Some(entry.keys.clone());
107            }
108            // Merkle root changed - data is stale
109            self.stale.fetch_add(1, Ordering::Relaxed);
110            drop(entry); // Release read lock before removing
111            self.cache.remove(&key);
112        }
113
114        self.misses.fetch_add(1, Ordering::Relaxed);
115        None
116    }
117
118    /// Cache search results
119    pub fn insert(&self, prefix: &str, query: &Query, merkle_root: Vec<u8>, keys: Vec<String>) {
120        let key = self.cache_key(prefix, query);
121
122        // Evict oldest if at capacity
123        if self.cache.len() >= self.max_entries {
124            let mut order = self.order.lock();
125            while self.cache.len() >= self.max_entries {
126                if let Some(old_key) = order.pop_front() {
127                    self.cache.remove(&old_key);
128                } else {
129                    break;
130                }
131            }
132        }
133
134        // Insert new entry
135        let is_new = !self.cache.contains_key(&key);
136        self.cache.insert(key.clone(), CacheEntry { merkle_root, keys });
137
138        if is_new {
139            let mut order = self.order.lock();
140            order.push_back(key);
141        }
142    }
143
144    /// Get cache statistics
145    pub fn stats(&self) -> SearchCacheStats {
146        let hits = self.hits.load(Ordering::Relaxed);
147        let misses = self.misses.load(Ordering::Relaxed);
148        let total = hits + misses;
149
150        SearchCacheStats {
151            hits,
152            misses,
153            stale: self.stale.load(Ordering::Relaxed),
154            entry_count: self.cache.len(),
155            hit_rate: if total > 0 {
156                hits as f64 / total as f64
157            } else {
158                0.0
159            },
160        }
161    }
162
163    /// Clear all cached entries
164    pub fn clear(&self) {
165        self.cache.clear();
166        self.order.lock().clear();
167    }
168
169    fn cache_key(&self, prefix: &str, query: &Query) -> CacheKey {
170        (prefix.to_string(), Self::hash_query(query))
171    }
172
173    fn hash_query(query: &Query) -> u64 {
174        // Hash the debug representation as a simple approach
175        // Could be more sophisticated with a custom Hash impl
176        let mut hasher = DefaultHasher::new();
177        format!("{:?}", query.root).hash(&mut hasher);
178        hasher.finish()
179    }
180}
181
182impl Default for SearchCache {
183    fn default() -> Self {
184        // Default to 1000 cached queries
185        Self::new(1000)
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_cache_hit() {
195        let cache = SearchCache::new(100);
196        let query = Query::field_eq("name", "Alice");
197        let merkle_root = vec![1, 2, 3, 4];
198        let keys = vec!["crdt:users:1".to_string(), "crdt:users:2".to_string()];
199
200        // Insert
201        cache.insert("crdt:users:", &query, merkle_root.clone(), keys.clone());
202
203        // Hit with same merkle root
204        let result = cache.get("crdt:users:", &query, &merkle_root);
205        assert_eq!(result, Some(keys));
206
207        let stats = cache.stats();
208        assert_eq!(stats.hits, 1);
209        assert_eq!(stats.misses, 0);
210    }
211
212    #[test]
213    fn test_cache_miss() {
214        let cache = SearchCache::new(100);
215        let query = Query::field_eq("name", "Alice");
216        let merkle_root = vec![1, 2, 3, 4];
217
218        // Miss - not cached
219        let result = cache.get("crdt:users:", &query, &merkle_root);
220        assert_eq!(result, None);
221
222        let stats = cache.stats();
223        assert_eq!(stats.hits, 0);
224        assert_eq!(stats.misses, 1);
225    }
226
227    #[test]
228    fn test_cache_stale_merkle() {
229        let cache = SearchCache::new(100);
230        let query = Query::field_eq("name", "Alice");
231        let old_root = vec![1, 2, 3, 4];
232        let new_root = vec![5, 6, 7, 8];
233        let keys = vec!["crdt:users:1".to_string()];
234
235        // Insert with old root
236        cache.insert("crdt:users:", &query, old_root, keys);
237
238        // Query with new root - should be stale
239        let result = cache.get("crdt:users:", &query, &new_root);
240        assert_eq!(result, None);
241
242        let stats = cache.stats();
243        assert_eq!(stats.hits, 0);
244        assert_eq!(stats.misses, 1);
245        assert_eq!(stats.stale, 1);
246    }
247
248    #[test]
249    fn test_different_queries() {
250        let cache = SearchCache::new(100);
251        let query1 = Query::field_eq("name", "Alice");
252        let query2 = Query::field_eq("name", "Bob");
253        let merkle_root = vec![1, 2, 3, 4];
254
255        cache.insert(
256            "crdt:users:",
257            &query1,
258            merkle_root.clone(),
259            vec!["alice".to_string()],
260        );
261        cache.insert(
262            "crdt:users:",
263            &query2,
264            merkle_root.clone(),
265            vec!["bob".to_string()],
266        );
267
268        // Both should hit
269        assert_eq!(
270            cache.get("crdt:users:", &query1, &merkle_root),
271            Some(vec!["alice".to_string()])
272        );
273        assert_eq!(
274            cache.get("crdt:users:", &query2, &merkle_root),
275            Some(vec!["bob".to_string()])
276        );
277    }
278
279    #[test]
280    fn test_different_prefixes() {
281        let cache = SearchCache::new(100);
282        let query = Query::field_eq("name", "Alice");
283        let root1 = vec![1, 2, 3];
284        let root2 = vec![4, 5, 6];
285
286        cache.insert("crdt:users:", &query, root1.clone(), vec!["u1".to_string()]);
287        cache.insert("crdt:posts:", &query, root2.clone(), vec!["p1".to_string()]);
288
289        // Each prefix has its own cache entry
290        assert_eq!(
291            cache.get("crdt:users:", &query, &root1),
292            Some(vec!["u1".to_string()])
293        );
294        assert_eq!(
295            cache.get("crdt:posts:", &query, &root2),
296            Some(vec!["p1".to_string()])
297        );
298    }
299
300    #[test]
301    fn test_hit_rate() {
302        let cache = SearchCache::new(100);
303        let query = Query::field_eq("name", "Alice");
304        let merkle_root = vec![1, 2, 3, 4];
305        let keys = vec!["crdt:users:1".to_string()];
306
307        cache.insert("crdt:users:", &query, merkle_root.clone(), keys);
308
309        // 3 hits, 1 miss
310        cache.get("crdt:users:", &query, &merkle_root);
311        cache.get("crdt:users:", &query, &merkle_root);
312        cache.get("crdt:users:", &query, &merkle_root);
313        cache.get("crdt:users:", &Query::field_eq("x", "y"), &merkle_root);
314
315        let stats = cache.stats();
316        assert_eq!(stats.hits, 3);
317        assert_eq!(stats.misses, 1);
318        assert!((stats.hit_rate - 0.75).abs() < 0.01);
319    }
320
321    #[test]
322    fn test_eviction_oldest() {
323        let cache = SearchCache::new(3); // Max 3 entries
324        let root = vec![1, 2, 3];
325
326        // Insert 3 entries
327        cache.insert("p:", &Query::field_eq("n", "1"), root.clone(), vec!["k1".into()]);
328        cache.insert("p:", &Query::field_eq("n", "2"), root.clone(), vec!["k2".into()]);
329        cache.insert("p:", &Query::field_eq("n", "3"), root.clone(), vec!["k3".into()]);
330
331        assert_eq!(cache.stats().entry_count, 3);
332
333        // Insert 4th - should evict oldest (query 1)
334        cache.insert("p:", &Query::field_eq("n", "4"), root.clone(), vec!["k4".into()]);
335
336        assert_eq!(cache.stats().entry_count, 3);
337
338        // Query 1 should be evicted
339        assert!(cache.get("p:", &Query::field_eq("n", "1"), &root).is_none());
340        // Query 4 should exist
341        assert!(cache.get("p:", &Query::field_eq("n", "4"), &root).is_some());
342    }
343}