sync_engine/cuckoo/
filter_manager.rs

1//! Cuckoo filter manager for L3 existence checks.
2//!
3//! Uses our expandable-cuckoo-filter for memory-efficient probabilistic
4//! membership testing with persistence support. This prevents unnecessary 
5//! L3 queries for keys that definitely don't exist.
6
7use expandable_cuckoo_filter::ExpandableCuckooFilter;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tracing::{debug, info, warn};
10
11/// Trust state of the cuckoo filter
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum FilterTrust {
14    /// Filter is empty/warming up - always check L3
15    Untrusted,
16    /// Filter is warmed and verified - trust "definitely not" responses
17    Trusted,
18}
19
20/// Cuckoo filter manager for L3 existence checks.
21///
22/// The filter tracks which keys exist in L3 (SQL). On cache miss:
23/// - If filter says "definitely not" AND trusted → return 404 immediately
24/// - If filter says "maybe" → check L3
25/// - If untrusted → always check L3 (degraded mode)
26///
27/// The filter must be warmed from L3 on startup and verified via merkle root
28/// comparison before being marked trusted.
29///
30/// ## Persistence
31/// 
32/// The filter supports export/import for fast startup:
33/// - On shutdown: `export()` → save bytes to SQLite
34/// - On startup: load bytes → `import()` → `mark_trusted()` → ready instantly
35pub struct FilterManager {
36    /// Expandable cuckoo filter - auto-grows, supports export/import
37    filter: ExpandableCuckooFilter,
38    /// Trust state - only trust "not found" if true
39    trust_state: AtomicBool,
40}
41
42impl FilterManager {
43    /// Create a new filter manager (starts untrusted).
44    ///
45    /// # Arguments
46    /// * `node_id` - Unique identifier for deterministic seeding (e.g., hostname)
47    /// * `initial_capacity` - Expected number of items (filter will grow if exceeded)
48    #[must_use]
49    pub fn new(node_id: impl Into<String>, initial_capacity: usize) -> Self {
50        let node_id = node_id.into();
51        info!(
52            node_id = %node_id,
53            initial_capacity,
54            "Creating expandable cuckoo filter manager"
55        );
56
57        let filter = ExpandableCuckooFilter::new(node_id, initial_capacity);
58
59        Self {
60            filter,
61            trust_state: AtomicBool::new(false),
62        }
63    }
64
65    /// Get current trust state.
66    #[must_use]
67    #[inline]
68    pub fn trust_state(&self) -> FilterTrust {
69        if self.trust_state.load(Ordering::Acquire) {
70            FilterTrust::Trusted
71        } else {
72            FilterTrust::Untrusted
73        }
74    }
75
76    /// Check if filter is trusted (convenience method).
77    #[must_use]
78    #[inline]
79    pub fn is_trusted(&self) -> bool {
80        self.trust_state.load(Ordering::Acquire)
81    }
82
83    /// Mark filter as trusted (after warmup + verification).
84    pub fn mark_trusted(&self) {
85        info!(
86            entries = self.filter.len(),
87            "Marking cuckoo filter as trusted"
88        );
89        self.trust_state.store(true, Ordering::Release);
90    }
91
92    /// Mark filter as untrusted (e.g., after detecting inconsistency).
93    pub fn mark_untrusted(&self) {
94        warn!("Marking cuckoo filter as untrusted");
95        self.trust_state.store(false, Ordering::Release);
96    }
97
98    /// Check if a key might exist in L3.
99    ///
100    /// Returns:
101    /// - `Some(true)` → key might exist, check L3
102    /// - `Some(false)` → key definitely doesn't exist (trusted mode only)
103    /// - `None` → filter is untrusted, must check L3
104    #[must_use]
105    pub fn might_exist(&self, key: &str) -> Option<bool> {
106        if !self.trust_state.load(Ordering::Acquire) {
107            // Untrusted - caller must check L3
108            return None;
109        }
110
111        Some(self.filter.contains(key))
112    }
113
114    /// Check if we should query L3 for this key.
115    #[must_use]
116    #[inline]
117    pub fn should_check_l3(&self, key: &str) -> bool {
118        match self.might_exist(key) {
119            None => true,         // Untrusted, must check
120            Some(true) => true,   // Might exist, check
121            Some(false) => false, // Definitely not, skip L3
122        }
123    }
124
125    /// Insert a key into the filter (call when item is persisted to L3).
126    pub fn insert(&self, key: &str) {
127        self.filter.insert(key);
128    }
129
130    /// Remove a key from the filter (call when item is deleted from L3).
131    pub fn remove(&self, key: &str) -> bool {
132        self.filter.remove(key)
133    }
134
135    /// Bulk insert keys during warmup.
136    ///
137    /// More efficient than individual inserts for large batches.
138    pub fn bulk_insert(&self, keys: &[String]) {
139        for key in keys {
140            self.filter.insert(key.as_str());
141        }
142        debug!(
143            added = keys.len(),
144            total = self.filter.len(),
145            "Bulk inserted keys into cuckoo filter"
146        );
147    }
148
149    /// Export filter state to bytes for persistence.
150    ///
151    /// Save these bytes to SQLite on shutdown for fast startup.
152    #[must_use]
153    pub fn export(&self) -> Option<Vec<u8>> {
154        self.filter.export().ok()
155    }
156
157    /// Import filter state from bytes.
158    ///
159    /// Load bytes from SQLite on startup, then call `mark_trusted()`.
160    pub fn import(&self, data: &[u8]) -> Result<(), String> {
161        self.filter.import(data).map_err(|e| e.to_string())
162    }
163
164    /// Get current entry count.
165    #[must_use]
166    pub fn len(&self) -> usize {
167        self.filter.len()
168    }
169
170    /// Check if filter is empty.
171    #[must_use]
172    pub fn is_empty(&self) -> bool {
173        self.filter.is_empty()
174    }
175
176    /// Get filter stats: (entry_count, capacity, trust_state).
177    #[must_use]
178    pub fn stats(&self) -> (usize, usize, FilterTrust) {
179        (
180            self.filter.len(),
181            self.filter.capacity(),
182            self.trust_state(),
183        )
184    }
185
186    /// Estimate memory usage in bytes.
187    #[must_use]
188    pub fn memory_usage_bytes(&self) -> usize {
189        // ~8 bytes per fingerprint slot + overhead
190        self.filter.capacity().saturating_mul(8).max(1024)
191    }
192
193    /// Get the node ID used for deterministic seeding.
194    #[must_use]
195    pub fn node_id(&self) -> &str {
196        self.filter.node_id()
197    }
198
199    /// Get the deterministic seed.
200    #[must_use]
201    pub fn seed(&self) -> u64 {
202        self.filter.seed()
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_new_filter_is_untrusted() {
212        let fm = FilterManager::new("test-node", 1000);
213        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
214        assert!(fm.is_empty());
215        assert_eq!(fm.len(), 0);
216    }
217
218    #[test]
219    fn test_mark_trusted_and_untrusted() {
220        let fm = FilterManager::new("test-node", 1000);
221        
222        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
223        
224        fm.mark_trusted();
225        assert_eq!(fm.trust_state(), FilterTrust::Trusted);
226        
227        fm.mark_untrusted();
228        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
229    }
230
231    #[test]
232    fn test_insert_and_contains() {
233        let fm = FilterManager::new("test-node", 1000);
234        fm.mark_trusted();
235        
236        // Insert key
237        fm.insert("key-1");
238        assert_eq!(fm.len(), 1);
239        
240        // Should contain it
241        assert_eq!(fm.might_exist("key-1"), Some(true));
242        
243        // Should not contain other key
244        assert_eq!(fm.might_exist("key-2"), Some(false));
245    }
246
247    #[test]
248    fn test_might_exist_returns_none_when_untrusted() {
249        let fm = FilterManager::new("test-node", 1000);
250        
251        fm.insert("key-1");
252        
253        // Untrusted, should return None
254        assert_eq!(fm.might_exist("key-1"), None);
255        assert_eq!(fm.might_exist("key-2"), None);
256    }
257
258    #[test]
259    fn test_should_check_l3_when_untrusted() {
260        let fm = FilterManager::new("test-node", 1000);
261        
262        // Untrusted = always check L3
263        assert!(fm.should_check_l3("any-key"));
264        assert!(fm.should_check_l3("another-key"));
265    }
266
267    #[test]
268    fn test_should_check_l3_when_trusted() {
269        let fm = FilterManager::new("test-node", 1000);
270        fm.mark_trusted();
271        
272        fm.insert("exists");
273        
274        // Key that might exist = check L3
275        assert!(fm.should_check_l3("exists"));
276        
277        // Key that definitely doesn't = don't check L3
278        assert!(!fm.should_check_l3("nonexistent"));
279    }
280
281    #[test]
282    fn test_remove() {
283        let fm = FilterManager::new("test-node", 1000);
284        fm.mark_trusted();
285        
286        fm.insert("to-remove");
287        assert_eq!(fm.might_exist("to-remove"), Some(true));
288        
289        let removed = fm.remove("to-remove");
290        // Note: cuckoo filter remove may return false if fingerprint collision
291        // but the key should no longer be in the filter
292        assert!(removed || fm.might_exist("to-remove") == Some(false));
293    }
294
295    #[test]
296    fn test_bulk_insert() {
297        let fm = FilterManager::new("test-node", 1000);
298        fm.mark_trusted();
299        
300        let keys: Vec<String> = (0..100).map(|i| format!("bulk-key-{}", i)).collect();
301        fm.bulk_insert(&keys);
302        
303        assert_eq!(fm.len(), 100);
304        
305        // Spot check some keys
306        assert_eq!(fm.might_exist("bulk-key-0"), Some(true));
307        assert_eq!(fm.might_exist("bulk-key-50"), Some(true));
308        assert_eq!(fm.might_exist("bulk-key-99"), Some(true));
309        assert_eq!(fm.might_exist("not-inserted"), Some(false));
310    }
311
312    #[test]
313    fn test_export_import() {
314        let fm1 = FilterManager::new("test-node", 1000);
315        
316        // Insert some keys
317        fm1.insert("key-a");
318        fm1.insert("key-b");
319        fm1.insert("key-c");
320        
321        // Export
322        let exported = fm1.export();
323        assert!(exported.is_some());
324        
325        // Create new filter and import
326        let fm2 = FilterManager::new("test-node", 1000);
327        let import_result = fm2.import(&exported.unwrap());
328        assert!(import_result.is_ok());
329        
330        // Should have same entries
331        assert_eq!(fm2.len(), 3);
332        
333        // Mark trusted and verify
334        fm2.mark_trusted();
335        assert_eq!(fm2.might_exist("key-a"), Some(true));
336        assert_eq!(fm2.might_exist("key-b"), Some(true));
337        assert_eq!(fm2.might_exist("key-c"), Some(true));
338    }
339
340    #[test]
341    fn test_stats() {
342        let fm = FilterManager::new("test-node", 1000);
343        
344        let (count, capacity, trust) = fm.stats();
345        assert_eq!(count, 0);
346        assert!(capacity > 0);
347        assert_eq!(trust, FilterTrust::Untrusted);
348        
349        fm.insert("key");
350        fm.mark_trusted();
351        
352        let (count, _, trust) = fm.stats();
353        assert_eq!(count, 1);
354        assert_eq!(trust, FilterTrust::Trusted);
355    }
356
357    #[test]
358    fn test_memory_usage_bytes() {
359        let fm = FilterManager::new("test-node", 10000);
360        
361        let mem = fm.memory_usage_bytes();
362        assert!(mem >= 1024, "Should report at least 1KB");
363        assert!(mem < 1024 * 1024, "Should be less than 1MB for 10k capacity");
364    }
365
366    #[test]
367    fn test_node_id_and_seed() {
368        let fm = FilterManager::new("my-unique-node", 1000);
369        
370        assert_eq!(fm.node_id(), "my-unique-node");
371        assert!(fm.seed() > 0);
372    }
373
374    #[test]
375    fn test_deterministic_seed_same_node() {
376        let fm1 = FilterManager::new("same-node", 1000);
377        let fm2 = FilterManager::new("same-node", 1000);
378        
379        assert_eq!(fm1.seed(), fm2.seed());
380    }
381
382    #[test]
383    fn test_different_seeds_different_nodes() {
384        let fm1 = FilterManager::new("node-a", 1000);
385        let fm2 = FilterManager::new("node-b", 1000);
386        
387        assert_ne!(fm1.seed(), fm2.seed());
388    }
389
390    #[test]
391    fn test_filter_grows_beyond_initial_capacity() {
392        let fm = FilterManager::new("test-node", 10); // Very small
393        
394        // Insert more than initial capacity
395        for i in 0..100 {
396            fm.insert(&format!("key-{}", i));
397        }
398        
399        // Should have grown
400        assert_eq!(fm.len(), 100);
401        
402        // Capacity should be >= entries
403        let (count, capacity, _) = fm.stats();
404        assert!(capacity >= count);
405    }
406
407    #[test]
408    fn test_is_empty() {
409        let fm = FilterManager::new("test-node", 1000);
410        
411        assert!(fm.is_empty());
412        
413        fm.insert("key");
414        assert!(!fm.is_empty());
415    }
416
417    #[test]
418    fn test_filter_trust_display() {
419        assert_eq!(format!("{:?}", FilterTrust::Untrusted), "Untrusted");
420        assert_eq!(format!("{:?}", FilterTrust::Trusted), "Trusted");
421    }
422
423    #[test]
424    fn test_multiple_inserts_same_key() {
425        let fm = FilterManager::new("test-node", 1000);
426        
427        fm.insert("same-key");
428        fm.insert("same-key");
429        fm.insert("same-key");
430        
431        // Cuckoo filters may or may not deduplicate, 
432        // but should at least contain the key
433        fm.mark_trusted();
434        assert_eq!(fm.might_exist("same-key"), Some(true));
435    }
436
437    #[test]
438    fn test_false_positive_rate_reasonable() {
439        let fm = FilterManager::new("test-node", 10000);
440        fm.mark_trusted();
441        
442        // Insert 1000 keys
443        for i in 0..1000 {
444            fm.insert(&format!("inserted-{}", i));
445        }
446        
447        // Check 1000 keys that were NOT inserted
448        let mut false_positives = 0;
449        for i in 0..1000 {
450            if fm.might_exist(&format!("not-inserted-{}", i)) == Some(true) {
451                false_positives += 1;
452            }
453        }
454        
455        // False positive rate should be < 5% for cuckoo filters
456        let fp_rate = false_positives as f64 / 1000.0;
457        assert!(fp_rate < 0.05, "False positive rate {} is too high", fp_rate);
458    }
459}