sync_engine/cuckoo/
filter_manager.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Cuckoo filter manager for L3 existence checks.
5//!
6//! Uses our expandable-cuckoo-filter for memory-efficient probabilistic
7//! membership testing with persistence support. This prevents unnecessary 
8//! L3 queries for keys that definitely don't exist.
9
10use expandable_cuckoo_filter::ExpandableCuckooFilter;
11use std::sync::atomic::{AtomicBool, Ordering};
12use tracing::{debug, info, warn};
13
14/// Trust state of the cuckoo filter
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum FilterTrust {
17    /// Filter is empty/warming up - always check L3
18    Untrusted,
19    /// Filter is warmed and verified - trust "definitely not" responses
20    Trusted,
21}
22
23/// Cuckoo filter manager for L3 existence checks.
24///
25/// The filter tracks which keys exist in L3 (SQL). On cache miss:
26/// - If filter says "definitely not" AND trusted → return 404 immediately
27/// - If filter says "maybe" → check L3
28/// - If untrusted → always check L3 (degraded mode)
29///
30/// The filter must be warmed from L3 on startup and verified via merkle root
31/// comparison before being marked trusted.
32///
33/// ## Persistence
34/// 
35/// The filter supports export/import for fast startup:
36/// - On shutdown: `export()` → save bytes to SQLite
37/// - On startup: load bytes → `import()` → `mark_trusted()` → ready instantly
38pub struct FilterManager {
39    /// Expandable cuckoo filter - auto-grows, supports export/import
40    filter: ExpandableCuckooFilter,
41    /// Trust state - only trust "not found" if true
42    trust_state: AtomicBool,
43}
44
45impl FilterManager {
46    /// Create a new filter manager (starts untrusted).
47    ///
48    /// # Arguments
49    /// * `node_id` - Unique identifier for deterministic seeding (e.g., hostname)
50    /// * `initial_capacity` - Expected number of items (filter will grow if exceeded)
51    #[must_use]
52    pub fn new(node_id: impl Into<String>, initial_capacity: usize) -> Self {
53        let node_id = node_id.into();
54        info!(
55            node_id = %node_id,
56            initial_capacity,
57            "Creating expandable cuckoo filter manager"
58        );
59
60        let filter = ExpandableCuckooFilter::new(node_id, initial_capacity);
61
62        Self {
63            filter,
64            trust_state: AtomicBool::new(false),
65        }
66    }
67
68    /// Get current trust state.
69    #[must_use]
70    #[inline]
71    pub fn trust_state(&self) -> FilterTrust {
72        if self.trust_state.load(Ordering::Acquire) {
73            FilterTrust::Trusted
74        } else {
75            FilterTrust::Untrusted
76        }
77    }
78
79    /// Check if filter is trusted (convenience method).
80    #[must_use]
81    #[inline]
82    pub fn is_trusted(&self) -> bool {
83        self.trust_state.load(Ordering::Acquire)
84    }
85
86    /// Mark filter as trusted (after warmup + verification).
87    pub fn mark_trusted(&self) {
88        info!(
89            entries = self.filter.len(),
90            "Marking cuckoo filter as trusted"
91        );
92        self.trust_state.store(true, Ordering::Release);
93    }
94
95    /// Mark filter as untrusted (e.g., after detecting inconsistency).
96    pub fn mark_untrusted(&self) {
97        warn!("Marking cuckoo filter as untrusted");
98        self.trust_state.store(false, Ordering::Release);
99    }
100
101    /// Check if a key might exist in L3.
102    ///
103    /// Returns:
104    /// - `Some(true)` → key might exist, check L3
105    /// - `Some(false)` → key definitely doesn't exist (trusted mode only)
106    /// - `None` → filter is untrusted, must check L3
107    #[must_use]
108    pub fn might_exist(&self, key: &str) -> Option<bool> {
109        if !self.trust_state.load(Ordering::Acquire) {
110            // Untrusted - caller must check L3
111            return None;
112        }
113
114        Some(self.filter.contains(key))
115    }
116
117    /// Check if we should query L3 for this key.
118    #[must_use]
119    #[inline]
120    pub fn should_check_l3(&self, key: &str) -> bool {
121        match self.might_exist(key) {
122            None => true,         // Untrusted, must check
123            Some(true) => true,   // Might exist, check
124            Some(false) => false, // Definitely not, skip L3
125        }
126    }
127
128    /// Insert a key into the filter (call when item is persisted to L3).
129    ///
130    /// This is treated as an idempotent operation: if the key already exists
131    /// (or a false positive match exists), we do nothing. This prevents
132    /// filter explosion on repeated updates to the same key.
133    pub fn insert(&self, key: &str) {
134        if !self.filter.contains(key) {
135            self.filter.insert(key);
136        }
137    }
138
139    /// Remove a key from the filter (call when item is deleted from L3).
140    pub fn remove(&self, key: &str) -> bool {
141        self.filter.remove(key)
142    }
143
144    /// Bulk insert keys during warmup.
145    ///
146    /// More efficient than individual inserts for large batches.
147    pub fn bulk_insert(&self, keys: &[String]) {
148        for key in keys {
149            if !self.filter.contains(key) {
150                self.filter.insert(key.as_str());
151            }
152        }
153        debug!(
154            added = keys.len(),
155            total = self.filter.len(),
156            "Bulk inserted keys into cuckoo filter"
157        );
158    }
159
160    /// Export filter state to bytes for persistence.
161    ///
162    /// Save these bytes to SQLite on shutdown for fast startup.
163    #[must_use]
164    pub fn export(&self) -> Option<Vec<u8>> {
165        self.filter.export().ok()
166    }
167
168    /// Import filter state from bytes.
169    ///
170    /// Load bytes from SQLite on startup, then call `mark_trusted()`.
171    pub fn import(&self, data: &[u8]) -> Result<(), String> {
172        self.filter.import(data).map_err(|e| e.to_string())
173    }
174
175    /// Get current entry count.
176    #[must_use]
177    pub fn len(&self) -> usize {
178        self.filter.len()
179    }
180
181    /// Check if filter is empty.
182    #[must_use]
183    pub fn is_empty(&self) -> bool {
184        self.filter.is_empty()
185    }
186
187    /// Get filter stats: (entry_count, capacity, trust_state).
188    #[must_use]
189    pub fn stats(&self) -> (usize, usize, FilterTrust) {
190        (
191            self.filter.len(),
192            self.filter.capacity(),
193            self.trust_state(),
194        )
195    }
196
197    /// Estimate memory usage in bytes.
198    #[must_use]
199    pub fn memory_usage_bytes(&self) -> usize {
200        // ~8 bytes per fingerprint slot + overhead
201        self.filter.capacity().saturating_mul(8).max(1024)
202    }
203
204    /// Get the node ID used for deterministic seeding.
205    #[must_use]
206    pub fn node_id(&self) -> &str {
207        self.filter.node_id()
208    }
209
210    /// Get the deterministic seed.
211    #[must_use]
212    pub fn seed(&self) -> u64 {
213        self.filter.seed()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn test_new_filter_is_untrusted() {
223        let fm = FilterManager::new("test-node", 1000);
224        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
225        assert!(fm.is_empty());
226        assert_eq!(fm.len(), 0);
227    }
228
229    #[test]
230    fn test_mark_trusted_and_untrusted() {
231        let fm = FilterManager::new("test-node", 1000);
232        
233        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
234        
235        fm.mark_trusted();
236        assert_eq!(fm.trust_state(), FilterTrust::Trusted);
237        
238        fm.mark_untrusted();
239        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
240    }
241
242    #[test]
243    fn test_insert_and_contains() {
244        let fm = FilterManager::new("test-node", 1000);
245        fm.mark_trusted();
246        
247        // Insert key
248        fm.insert("key-1");
249        assert_eq!(fm.len(), 1);
250        
251        // Should contain it
252        assert_eq!(fm.might_exist("key-1"), Some(true));
253        
254        // Should not contain other key
255        assert_eq!(fm.might_exist("key-2"), Some(false));
256    }
257
258    #[test]
259    fn test_might_exist_returns_none_when_untrusted() {
260        let fm = FilterManager::new("test-node", 1000);
261        
262        fm.insert("key-1");
263        
264        // Untrusted, should return None
265        assert_eq!(fm.might_exist("key-1"), None);
266        assert_eq!(fm.might_exist("key-2"), None);
267    }
268
269    #[test]
270    fn test_should_check_l3_when_untrusted() {
271        let fm = FilterManager::new("test-node", 1000);
272        
273        // Untrusted = always check L3
274        assert!(fm.should_check_l3("any-key"));
275        assert!(fm.should_check_l3("another-key"));
276    }
277
278    #[test]
279    fn test_should_check_l3_when_trusted() {
280        let fm = FilterManager::new("test-node", 1000);
281        fm.mark_trusted();
282        
283        fm.insert("exists");
284        
285        // Key that might exist = check L3
286        assert!(fm.should_check_l3("exists"));
287        
288        // Key that definitely doesn't = don't check L3
289        assert!(!fm.should_check_l3("nonexistent"));
290    }
291
292    #[test]
293    fn test_remove() {
294        let fm = FilterManager::new("test-node", 1000);
295        fm.mark_trusted();
296        
297        fm.insert("to-remove");
298        assert_eq!(fm.might_exist("to-remove"), Some(true));
299        
300        let removed = fm.remove("to-remove");
301        // Note: cuckoo filter remove may return false if fingerprint collision
302        // but the key should no longer be in the filter
303        assert!(removed || fm.might_exist("to-remove") == Some(false));
304    }
305
306    #[test]
307    fn test_bulk_insert() {
308        let fm = FilterManager::new("test-node", 1000);
309        fm.mark_trusted();
310        
311        let keys: Vec<String> = (0..100).map(|i| format!("bulk-key-{}", i)).collect();
312        fm.bulk_insert(&keys);
313        
314        assert_eq!(fm.len(), 100);
315        
316        // Spot check some keys
317        assert_eq!(fm.might_exist("bulk-key-0"), Some(true));
318        assert_eq!(fm.might_exist("bulk-key-50"), Some(true));
319        assert_eq!(fm.might_exist("bulk-key-99"), Some(true));
320        assert_eq!(fm.might_exist("not-inserted"), Some(false));
321    }
322
323    #[test]
324    fn test_export_import() {
325        let fm1 = FilterManager::new("test-node", 1000);
326        
327        // Insert some keys
328        fm1.insert("key-a");
329        fm1.insert("key-b");
330        fm1.insert("key-c");
331        
332        // Export
333        let exported = fm1.export();
334        assert!(exported.is_some());
335        
336        // Create new filter and import
337        let fm2 = FilterManager::new("test-node", 1000);
338        let import_result = fm2.import(&exported.unwrap());
339        assert!(import_result.is_ok());
340        
341        // Should have same entries
342        assert_eq!(fm2.len(), 3);
343        
344        // Mark trusted and verify
345        fm2.mark_trusted();
346        assert_eq!(fm2.might_exist("key-a"), Some(true));
347        assert_eq!(fm2.might_exist("key-b"), Some(true));
348        assert_eq!(fm2.might_exist("key-c"), Some(true));
349    }
350
351    #[test]
352    fn test_stats() {
353        let fm = FilterManager::new("test-node", 1000);
354        
355        let (count, capacity, trust) = fm.stats();
356        assert_eq!(count, 0);
357        assert!(capacity > 0);
358        assert_eq!(trust, FilterTrust::Untrusted);
359        
360        fm.insert("key");
361        fm.mark_trusted();
362        
363        let (count, _, trust) = fm.stats();
364        assert_eq!(count, 1);
365        assert_eq!(trust, FilterTrust::Trusted);
366    }
367
368    #[test]
369    fn test_memory_usage_bytes() {
370        let fm = FilterManager::new("test-node", 10000);
371        
372        let mem = fm.memory_usage_bytes();
373        assert!(mem >= 1024, "Should report at least 1KB");
374        assert!(mem < 1024 * 1024, "Should be less than 1MB for 10k capacity");
375    }
376
377    #[test]
378    fn test_node_id_and_seed() {
379        let fm = FilterManager::new("my-unique-node", 1000);
380        
381        assert_eq!(fm.node_id(), "my-unique-node");
382        assert!(fm.seed() > 0);
383    }
384
385    #[test]
386    fn test_deterministic_seed_same_node() {
387        let fm1 = FilterManager::new("same-node", 1000);
388        let fm2 = FilterManager::new("same-node", 1000);
389        
390        assert_eq!(fm1.seed(), fm2.seed());
391    }
392
393    #[test]
394    fn test_different_seeds_different_nodes() {
395        let fm1 = FilterManager::new("node-a", 1000);
396        let fm2 = FilterManager::new("node-b", 1000);
397        
398        assert_ne!(fm1.seed(), fm2.seed());
399    }
400
401    #[test]
402    fn test_filter_grows_beyond_initial_capacity() {
403        let fm = FilterManager::new("test-node", 10); // Very small
404        
405        // Insert more than initial capacity
406        for i in 0..100 {
407            fm.insert(&format!("key-{}", i));
408        }
409        
410        // Should have grown (some may be skipped due to false positives in contains check)
411        // Cuckoo filters have a small false positive rate (~3%), so we expect at least 95
412        assert!(fm.len() >= 95, "Expected at least 95 entries, got {}", fm.len());
413        
414        // Capacity should be >= entries
415        let (count, capacity, _) = fm.stats();
416        assert!(capacity >= count);
417    }
418
419    #[test]
420    fn test_is_empty() {
421        let fm = FilterManager::new("test-node", 1000);
422        
423        assert!(fm.is_empty());
424        
425        fm.insert("key");
426        assert!(!fm.is_empty());
427    }
428
429    #[test]
430    fn test_filter_trust_display() {
431        assert_eq!(format!("{:?}", FilterTrust::Untrusted), "Untrusted");
432        assert_eq!(format!("{:?}", FilterTrust::Trusted), "Trusted");
433    }
434
435    #[test]
436    fn test_multiple_inserts_same_key() {
437        let fm = FilterManager::new("test-node", 1000);
438        
439        fm.insert("same-key");
440        fm.insert("same-key");
441        fm.insert("same-key");
442        
443        // Cuckoo filters may or may not deduplicate, 
444        // but should at least contain the key
445        fm.mark_trusted();
446        assert_eq!(fm.might_exist("same-key"), Some(true));
447    }
448
449    #[test]
450    fn test_false_positive_rate_reasonable() {
451        let fm = FilterManager::new("test-node", 10000);
452        fm.mark_trusted();
453        
454        // Insert 1000 keys
455        for i in 0..1000 {
456            fm.insert(&format!("inserted-{}", i));
457        }
458        
459        // Check 1000 keys that were NOT inserted
460        let mut false_positives = 0;
461        for i in 0..1000 {
462            if fm.might_exist(&format!("not-inserted-{}", i)) == Some(true) {
463                false_positives += 1;
464            }
465        }
466        
467        // False positive rate should be < 5% for cuckoo filters
468        let fp_rate = false_positives as f64 / 1000.0;
469        assert!(fp_rate < 0.05, "False positive rate {} is too high", fp_rate);
470    }
471}