sync_engine/search/
search_cache.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Search Result Cache
5//!
6//! Caches SQL search results with merkle-based invalidation.
7//! When the merkle root for a prefix changes, cached results are automatically stale.
8//!
9//! # Why This Works
10//!
11//! - Merkle root = content hash of all data under prefix
12//! - Any write → new merkle root → cache auto-invalidates
13//! - Repeated identical searches → memory hit
14//! - Bounded by max entries with oldest-eviction
15//!
16//! # Flow
17//!
18//! ```text
19//! Search query arrives
20//!       │
21//!       ▼
22//! ┌─────────────────────────────┐
23//! │  Cache lookup               │
24//! │  key = hash(query + prefix) │
25//! │  check: cached_merkle_root  │
26//! │         == current_root?    │
27//! └─────────────────────────────┘
28//!       │
29//!       ├─→ Hit + root matches → return cached keys
30//!       │
31//!       └─→ Miss OR stale → run SQL, cache results
32//! ```
33
34use dashmap::DashMap;
35use parking_lot::Mutex;
36use std::collections::hash_map::DefaultHasher;
37use std::collections::VecDeque;
38use std::hash::{Hash, Hasher};
39use std::sync::atomic::{AtomicU64, Ordering};
40
41use super::Query;
42
43/// Cache key: (prefix, query_hash)
44type CacheKey = (String, u64);
45
46/// Cached search result entry
47#[derive(Clone, Debug)]
48struct CacheEntry {
49    /// Merkle root at time of caching
50    merkle_root: Vec<u8>,
51    /// Cached result keys
52    keys: Vec<String>,
53}
54
55/// Search result cache with merkle-based invalidation
56pub struct SearchCache {
57    /// Cache: (prefix, query_hash) → (merkle_root, result_keys)
58    cache: DashMap<CacheKey, CacheEntry>,
59    /// Insertion order for eviction (oldest first)
60    order: Mutex<VecDeque<CacheKey>>,
61    /// Maximum number of entries
62    max_entries: usize,
63    /// Cache hits counter
64    hits: AtomicU64,
65    /// Cache misses counter
66    misses: AtomicU64,
67    /// Stale invalidations counter
68    stale: AtomicU64,
69}
70
71/// Cache statistics
72#[derive(Debug, Clone)]
73pub struct SearchCacheStats {
74    /// Number of cache hits
75    pub hits: u64,
76    /// Number of cache misses
77    pub misses: u64,
78    /// Number of stale entries (merkle root changed)
79    pub stale: u64,
80    /// Current number of entries
81    pub entry_count: usize,
82    /// Hit rate (0.0 - 1.0)
83    pub hit_rate: f64,
84}
85
86impl SearchCache {
87    /// Create a new search cache with the given max entries
88    pub fn new(max_entries: usize) -> Self {
89        Self {
90            cache: DashMap::new(),
91            order: Mutex::new(VecDeque::new()),
92            max_entries,
93            hits: AtomicU64::new(0),
94            misses: AtomicU64::new(0),
95            stale: AtomicU64::new(0),
96        }
97    }
98
99    /// Try to get cached results for a query
100    ///
101    /// Returns `Some(keys)` if cache hit and merkle root matches,
102    /// `None` if miss or stale.
103    pub fn get(&self, prefix: &str, query: &Query, current_merkle_root: &[u8]) -> Option<Vec<String>> {
104        let key = self.cache_key(prefix, query);
105
106        if let Some(entry) = self.cache.get(&key) {
107            if entry.merkle_root == current_merkle_root {
108                self.hits.fetch_add(1, Ordering::Relaxed);
109                return Some(entry.keys.clone());
110            }
111            // Merkle root changed - data is stale
112            self.stale.fetch_add(1, Ordering::Relaxed);
113            drop(entry); // Release read lock before removing
114            self.cache.remove(&key);
115        }
116
117        self.misses.fetch_add(1, Ordering::Relaxed);
118        None
119    }
120
121    /// Cache search results
122    pub fn insert(&self, prefix: &str, query: &Query, merkle_root: Vec<u8>, keys: Vec<String>) {
123        let key = self.cache_key(prefix, query);
124
125        // Evict oldest if at capacity
126        if self.cache.len() >= self.max_entries {
127            let mut order = self.order.lock();
128            while self.cache.len() >= self.max_entries {
129                if let Some(old_key) = order.pop_front() {
130                    self.cache.remove(&old_key);
131                } else {
132                    break;
133                }
134            }
135        }
136
137        // Insert new entry
138        let is_new = !self.cache.contains_key(&key);
139        self.cache.insert(key.clone(), CacheEntry { merkle_root, keys });
140
141        if is_new {
142            let mut order = self.order.lock();
143            order.push_back(key);
144        }
145    }
146
147    /// Get cache statistics
148    pub fn stats(&self) -> SearchCacheStats {
149        let hits = self.hits.load(Ordering::Relaxed);
150        let misses = self.misses.load(Ordering::Relaxed);
151        let total = hits + misses;
152
153        SearchCacheStats {
154            hits,
155            misses,
156            stale: self.stale.load(Ordering::Relaxed),
157            entry_count: self.cache.len(),
158            hit_rate: if total > 0 {
159                hits as f64 / total as f64
160            } else {
161                0.0
162            },
163        }
164    }
165
166    /// Clear all cached entries
167    pub fn clear(&self) {
168        self.cache.clear();
169        self.order.lock().clear();
170    }
171
172    fn cache_key(&self, prefix: &str, query: &Query) -> CacheKey {
173        (prefix.to_string(), Self::hash_query(query))
174    }
175
176    fn hash_query(query: &Query) -> u64 {
177        // Hash the debug representation as a simple approach
178        // Could be more sophisticated with a custom Hash impl
179        let mut hasher = DefaultHasher::new();
180        format!("{:?}", query.root).hash(&mut hasher);
181        hasher.finish()
182    }
183}
184
185impl Default for SearchCache {
186    fn default() -> Self {
187        // Default to 1000 cached queries
188        Self::new(1000)
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_cache_hit() {
198        let cache = SearchCache::new(100);
199        let query = Query::field_eq("name", "Alice");
200        let merkle_root = vec![1, 2, 3, 4];
201        let keys = vec!["crdt:users:1".to_string(), "crdt:users:2".to_string()];
202
203        // Insert
204        cache.insert("crdt:users:", &query, merkle_root.clone(), keys.clone());
205
206        // Hit with same merkle root
207        let result = cache.get("crdt:users:", &query, &merkle_root);
208        assert_eq!(result, Some(keys));
209
210        let stats = cache.stats();
211        assert_eq!(stats.hits, 1);
212        assert_eq!(stats.misses, 0);
213    }
214
215    #[test]
216    fn test_cache_miss() {
217        let cache = SearchCache::new(100);
218        let query = Query::field_eq("name", "Alice");
219        let merkle_root = vec![1, 2, 3, 4];
220
221        // Miss - not cached
222        let result = cache.get("crdt:users:", &query, &merkle_root);
223        assert_eq!(result, None);
224
225        let stats = cache.stats();
226        assert_eq!(stats.hits, 0);
227        assert_eq!(stats.misses, 1);
228    }
229
230    #[test]
231    fn test_cache_stale_merkle() {
232        let cache = SearchCache::new(100);
233        let query = Query::field_eq("name", "Alice");
234        let old_root = vec![1, 2, 3, 4];
235        let new_root = vec![5, 6, 7, 8];
236        let keys = vec!["crdt:users:1".to_string()];
237
238        // Insert with old root
239        cache.insert("crdt:users:", &query, old_root, keys);
240
241        // Query with new root - should be stale
242        let result = cache.get("crdt:users:", &query, &new_root);
243        assert_eq!(result, None);
244
245        let stats = cache.stats();
246        assert_eq!(stats.hits, 0);
247        assert_eq!(stats.misses, 1);
248        assert_eq!(stats.stale, 1);
249    }
250
251    #[test]
252    fn test_different_queries() {
253        let cache = SearchCache::new(100);
254        let query1 = Query::field_eq("name", "Alice");
255        let query2 = Query::field_eq("name", "Bob");
256        let merkle_root = vec![1, 2, 3, 4];
257
258        cache.insert(
259            "crdt:users:",
260            &query1,
261            merkle_root.clone(),
262            vec!["alice".to_string()],
263        );
264        cache.insert(
265            "crdt:users:",
266            &query2,
267            merkle_root.clone(),
268            vec!["bob".to_string()],
269        );
270
271        // Both should hit
272        assert_eq!(
273            cache.get("crdt:users:", &query1, &merkle_root),
274            Some(vec!["alice".to_string()])
275        );
276        assert_eq!(
277            cache.get("crdt:users:", &query2, &merkle_root),
278            Some(vec!["bob".to_string()])
279        );
280    }
281
282    #[test]
283    fn test_different_prefixes() {
284        let cache = SearchCache::new(100);
285        let query = Query::field_eq("name", "Alice");
286        let root1 = vec![1, 2, 3];
287        let root2 = vec![4, 5, 6];
288
289        cache.insert("crdt:users:", &query, root1.clone(), vec!["u1".to_string()]);
290        cache.insert("crdt:posts:", &query, root2.clone(), vec!["p1".to_string()]);
291
292        // Each prefix has its own cache entry
293        assert_eq!(
294            cache.get("crdt:users:", &query, &root1),
295            Some(vec!["u1".to_string()])
296        );
297        assert_eq!(
298            cache.get("crdt:posts:", &query, &root2),
299            Some(vec!["p1".to_string()])
300        );
301    }
302
303    #[test]
304    fn test_hit_rate() {
305        let cache = SearchCache::new(100);
306        let query = Query::field_eq("name", "Alice");
307        let merkle_root = vec![1, 2, 3, 4];
308        let keys = vec!["crdt:users:1".to_string()];
309
310        cache.insert("crdt:users:", &query, merkle_root.clone(), keys);
311
312        // 3 hits, 1 miss
313        cache.get("crdt:users:", &query, &merkle_root);
314        cache.get("crdt:users:", &query, &merkle_root);
315        cache.get("crdt:users:", &query, &merkle_root);
316        cache.get("crdt:users:", &Query::field_eq("x", "y"), &merkle_root);
317
318        let stats = cache.stats();
319        assert_eq!(stats.hits, 3);
320        assert_eq!(stats.misses, 1);
321        assert!((stats.hit_rate - 0.75).abs() < 0.01);
322    }
323
324    #[test]
325    fn test_eviction_oldest() {
326        let cache = SearchCache::new(3); // Max 3 entries
327        let root = vec![1, 2, 3];
328
329        // Insert 3 entries
330        cache.insert("p:", &Query::field_eq("n", "1"), root.clone(), vec!["k1".into()]);
331        cache.insert("p:", &Query::field_eq("n", "2"), root.clone(), vec!["k2".into()]);
332        cache.insert("p:", &Query::field_eq("n", "3"), root.clone(), vec!["k3".into()]);
333
334        assert_eq!(cache.stats().entry_count, 3);
335
336        // Insert 4th - should evict oldest (query 1)
337        cache.insert("p:", &Query::field_eq("n", "4"), root.clone(), vec!["k4".into()]);
338
339        assert_eq!(cache.stats().entry_count, 3);
340
341        // Query 1 should be evicted
342        assert!(cache.get("p:", &Query::field_eq("n", "1"), &root).is_none());
343        // Query 4 should exist
344        assert!(cache.get("p:", &Query::field_eq("n", "4"), &root).is_some());
345    }
346}