sync_engine/cuckoo/
filter_manager.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Cuckoo filter manager for L3 existence checks.
5//!
6//! Uses our expandable-cuckoo-filter for memory-efficient probabilistic
7//! membership testing with persistence support. This prevents unnecessary 
8//! L3 queries for keys that definitely don't exist.
9
10use expandable_cuckoo_filter::ExpandableCuckooFilter;
11use std::sync::atomic::{AtomicBool, Ordering};
12use tracing::{debug, info, warn};
13
14/// Trust state of the cuckoo filter
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum FilterTrust {
17    /// Filter is empty/warming up - always check L3
18    Untrusted,
19    /// Filter is warmed and verified - trust "definitely not" responses
20    Trusted,
21}
22
23/// Cuckoo filter manager for L3 existence checks.
24///
25/// The filter tracks which keys exist in L3 (SQL). On cache miss:
26/// - If filter says "definitely not" AND trusted → return 404 immediately
27/// - If filter says "maybe" → check L3
28/// - If untrusted → always check L3 (degraded mode)
29///
30/// The filter must be warmed from L3 on startup and verified via merkle root
31/// comparison before being marked trusted.
32///
33/// ## Persistence
34/// 
35/// The filter supports export/import for fast startup:
36/// - On shutdown: `export()` → save bytes to SQLite
37/// - On startup: load bytes → `import()` → `mark_trusted()` → ready instantly
38pub struct FilterManager {
39    /// Expandable cuckoo filter - auto-grows, supports export/import
40    filter: ExpandableCuckooFilter,
41    /// Trust state - only trust "not found" if true
42    trust_state: AtomicBool,
43}
44
45impl FilterManager {
46    /// Create a new filter manager (starts untrusted).
47    ///
48    /// # Arguments
49    /// * `node_id` - Unique identifier for deterministic seeding (e.g., hostname)
50    /// * `initial_capacity` - Expected number of items (filter will grow if exceeded)
51    #[must_use]
52    pub fn new(node_id: impl Into<String>, initial_capacity: usize) -> Self {
53        let node_id = node_id.into();
54        info!(
55            node_id = %node_id,
56            initial_capacity,
57            "Creating expandable cuckoo filter manager"
58        );
59
60        let filter = ExpandableCuckooFilter::new(node_id, initial_capacity);
61
62        Self {
63            filter,
64            trust_state: AtomicBool::new(false),
65        }
66    }
67
68    /// Get current trust state.
69    #[must_use]
70    #[inline]
71    pub fn trust_state(&self) -> FilterTrust {
72        if self.trust_state.load(Ordering::Acquire) {
73            FilterTrust::Trusted
74        } else {
75            FilterTrust::Untrusted
76        }
77    }
78
79    /// Check if filter is trusted (convenience method).
80    #[must_use]
81    #[inline]
82    pub fn is_trusted(&self) -> bool {
83        self.trust_state.load(Ordering::Acquire)
84    }
85
86    /// Mark filter as trusted (after warmup + verification).
87    pub fn mark_trusted(&self) {
88        info!(
89            entries = self.filter.len(),
90            "Marking cuckoo filter as trusted"
91        );
92        self.trust_state.store(true, Ordering::Release);
93    }
94
95    /// Mark filter as untrusted (e.g., after detecting inconsistency).
96    pub fn mark_untrusted(&self) {
97        warn!("Marking cuckoo filter as untrusted");
98        self.trust_state.store(false, Ordering::Release);
99    }
100
101    /// Check if a key might exist in L3.
102    ///
103    /// Returns:
104    /// - `Some(true)` → key might exist, check L3
105    /// - `Some(false)` → key definitely doesn't exist (trusted mode only)
106    /// - `None` → filter is untrusted, must check L3
107    #[must_use]
108    pub fn might_exist(&self, key: &str) -> Option<bool> {
109        if !self.trust_state.load(Ordering::Acquire) {
110            // Untrusted - caller must check L3
111            return None;
112        }
113
114        Some(self.filter.contains(key))
115    }
116
117    /// Check if we should query L3 for this key.
118    #[must_use]
119    #[inline]
120    pub fn should_check_l3(&self, key: &str) -> bool {
121        match self.might_exist(key) {
122            None => true,         // Untrusted, must check
123            Some(true) => true,   // Might exist, check
124            Some(false) => false, // Definitely not, skip L3
125        }
126    }
127
128    /// Insert a key into the filter (call when item is persisted to L3).
129    pub fn insert(&self, key: &str) {
130        self.filter.insert(key);
131    }
132
133    /// Remove a key from the filter (call when item is deleted from L3).
134    pub fn remove(&self, key: &str) -> bool {
135        self.filter.remove(key)
136    }
137
138    /// Bulk insert keys during warmup.
139    ///
140    /// More efficient than individual inserts for large batches.
141    pub fn bulk_insert(&self, keys: &[String]) {
142        for key in keys {
143            self.filter.insert(key.as_str());
144        }
145        debug!(
146            added = keys.len(),
147            total = self.filter.len(),
148            "Bulk inserted keys into cuckoo filter"
149        );
150    }
151
152    /// Export filter state to bytes for persistence.
153    ///
154    /// Save these bytes to SQLite on shutdown for fast startup.
155    #[must_use]
156    pub fn export(&self) -> Option<Vec<u8>> {
157        self.filter.export().ok()
158    }
159
160    /// Import filter state from bytes.
161    ///
162    /// Load bytes from SQLite on startup, then call `mark_trusted()`.
163    pub fn import(&self, data: &[u8]) -> Result<(), String> {
164        self.filter.import(data).map_err(|e| e.to_string())
165    }
166
167    /// Get current entry count.
168    #[must_use]
169    pub fn len(&self) -> usize {
170        self.filter.len()
171    }
172
173    /// Check if filter is empty.
174    #[must_use]
175    pub fn is_empty(&self) -> bool {
176        self.filter.is_empty()
177    }
178
179    /// Get filter stats: (entry_count, capacity, trust_state).
180    #[must_use]
181    pub fn stats(&self) -> (usize, usize, FilterTrust) {
182        (
183            self.filter.len(),
184            self.filter.capacity(),
185            self.trust_state(),
186        )
187    }
188
189    /// Estimate memory usage in bytes.
190    #[must_use]
191    pub fn memory_usage_bytes(&self) -> usize {
192        // ~8 bytes per fingerprint slot + overhead
193        self.filter.capacity().saturating_mul(8).max(1024)
194    }
195
196    /// Get the node ID used for deterministic seeding.
197    #[must_use]
198    pub fn node_id(&self) -> &str {
199        self.filter.node_id()
200    }
201
202    /// Get the deterministic seed.
203    #[must_use]
204    pub fn seed(&self) -> u64 {
205        self.filter.seed()
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_new_filter_is_untrusted() {
215        let fm = FilterManager::new("test-node", 1000);
216        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
217        assert!(fm.is_empty());
218        assert_eq!(fm.len(), 0);
219    }
220
221    #[test]
222    fn test_mark_trusted_and_untrusted() {
223        let fm = FilterManager::new("test-node", 1000);
224        
225        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
226        
227        fm.mark_trusted();
228        assert_eq!(fm.trust_state(), FilterTrust::Trusted);
229        
230        fm.mark_untrusted();
231        assert_eq!(fm.trust_state(), FilterTrust::Untrusted);
232    }
233
234    #[test]
235    fn test_insert_and_contains() {
236        let fm = FilterManager::new("test-node", 1000);
237        fm.mark_trusted();
238        
239        // Insert key
240        fm.insert("key-1");
241        assert_eq!(fm.len(), 1);
242        
243        // Should contain it
244        assert_eq!(fm.might_exist("key-1"), Some(true));
245        
246        // Should not contain other key
247        assert_eq!(fm.might_exist("key-2"), Some(false));
248    }
249
250    #[test]
251    fn test_might_exist_returns_none_when_untrusted() {
252        let fm = FilterManager::new("test-node", 1000);
253        
254        fm.insert("key-1");
255        
256        // Untrusted, should return None
257        assert_eq!(fm.might_exist("key-1"), None);
258        assert_eq!(fm.might_exist("key-2"), None);
259    }
260
261    #[test]
262    fn test_should_check_l3_when_untrusted() {
263        let fm = FilterManager::new("test-node", 1000);
264        
265        // Untrusted = always check L3
266        assert!(fm.should_check_l3("any-key"));
267        assert!(fm.should_check_l3("another-key"));
268    }
269
270    #[test]
271    fn test_should_check_l3_when_trusted() {
272        let fm = FilterManager::new("test-node", 1000);
273        fm.mark_trusted();
274        
275        fm.insert("exists");
276        
277        // Key that might exist = check L3
278        assert!(fm.should_check_l3("exists"));
279        
280        // Key that definitely doesn't = don't check L3
281        assert!(!fm.should_check_l3("nonexistent"));
282    }
283
284    #[test]
285    fn test_remove() {
286        let fm = FilterManager::new("test-node", 1000);
287        fm.mark_trusted();
288        
289        fm.insert("to-remove");
290        assert_eq!(fm.might_exist("to-remove"), Some(true));
291        
292        let removed = fm.remove("to-remove");
293        // Note: cuckoo filter remove may return false if fingerprint collision
294        // but the key should no longer be in the filter
295        assert!(removed || fm.might_exist("to-remove") == Some(false));
296    }
297
298    #[test]
299    fn test_bulk_insert() {
300        let fm = FilterManager::new("test-node", 1000);
301        fm.mark_trusted();
302        
303        let keys: Vec<String> = (0..100).map(|i| format!("bulk-key-{}", i)).collect();
304        fm.bulk_insert(&keys);
305        
306        assert_eq!(fm.len(), 100);
307        
308        // Spot check some keys
309        assert_eq!(fm.might_exist("bulk-key-0"), Some(true));
310        assert_eq!(fm.might_exist("bulk-key-50"), Some(true));
311        assert_eq!(fm.might_exist("bulk-key-99"), Some(true));
312        assert_eq!(fm.might_exist("not-inserted"), Some(false));
313    }
314
315    #[test]
316    fn test_export_import() {
317        let fm1 = FilterManager::new("test-node", 1000);
318        
319        // Insert some keys
320        fm1.insert("key-a");
321        fm1.insert("key-b");
322        fm1.insert("key-c");
323        
324        // Export
325        let exported = fm1.export();
326        assert!(exported.is_some());
327        
328        // Create new filter and import
329        let fm2 = FilterManager::new("test-node", 1000);
330        let import_result = fm2.import(&exported.unwrap());
331        assert!(import_result.is_ok());
332        
333        // Should have same entries
334        assert_eq!(fm2.len(), 3);
335        
336        // Mark trusted and verify
337        fm2.mark_trusted();
338        assert_eq!(fm2.might_exist("key-a"), Some(true));
339        assert_eq!(fm2.might_exist("key-b"), Some(true));
340        assert_eq!(fm2.might_exist("key-c"), Some(true));
341    }
342
343    #[test]
344    fn test_stats() {
345        let fm = FilterManager::new("test-node", 1000);
346        
347        let (count, capacity, trust) = fm.stats();
348        assert_eq!(count, 0);
349        assert!(capacity > 0);
350        assert_eq!(trust, FilterTrust::Untrusted);
351        
352        fm.insert("key");
353        fm.mark_trusted();
354        
355        let (count, _, trust) = fm.stats();
356        assert_eq!(count, 1);
357        assert_eq!(trust, FilterTrust::Trusted);
358    }
359
360    #[test]
361    fn test_memory_usage_bytes() {
362        let fm = FilterManager::new("test-node", 10000);
363        
364        let mem = fm.memory_usage_bytes();
365        assert!(mem >= 1024, "Should report at least 1KB");
366        assert!(mem < 1024 * 1024, "Should be less than 1MB for 10k capacity");
367    }
368
369    #[test]
370    fn test_node_id_and_seed() {
371        let fm = FilterManager::new("my-unique-node", 1000);
372        
373        assert_eq!(fm.node_id(), "my-unique-node");
374        assert!(fm.seed() > 0);
375    }
376
377    #[test]
378    fn test_deterministic_seed_same_node() {
379        let fm1 = FilterManager::new("same-node", 1000);
380        let fm2 = FilterManager::new("same-node", 1000);
381        
382        assert_eq!(fm1.seed(), fm2.seed());
383    }
384
385    #[test]
386    fn test_different_seeds_different_nodes() {
387        let fm1 = FilterManager::new("node-a", 1000);
388        let fm2 = FilterManager::new("node-b", 1000);
389        
390        assert_ne!(fm1.seed(), fm2.seed());
391    }
392
393    #[test]
394    fn test_filter_grows_beyond_initial_capacity() {
395        let fm = FilterManager::new("test-node", 10); // Very small
396        
397        // Insert more than initial capacity
398        for i in 0..100 {
399            fm.insert(&format!("key-{}", i));
400        }
401        
402        // Should have grown
403        assert_eq!(fm.len(), 100);
404        
405        // Capacity should be >= entries
406        let (count, capacity, _) = fm.stats();
407        assert!(capacity >= count);
408    }
409
410    #[test]
411    fn test_is_empty() {
412        let fm = FilterManager::new("test-node", 1000);
413        
414        assert!(fm.is_empty());
415        
416        fm.insert("key");
417        assert!(!fm.is_empty());
418    }
419
420    #[test]
421    fn test_filter_trust_display() {
422        assert_eq!(format!("{:?}", FilterTrust::Untrusted), "Untrusted");
423        assert_eq!(format!("{:?}", FilterTrust::Trusted), "Trusted");
424    }
425
426    #[test]
427    fn test_multiple_inserts_same_key() {
428        let fm = FilterManager::new("test-node", 1000);
429        
430        fm.insert("same-key");
431        fm.insert("same-key");
432        fm.insert("same-key");
433        
434        // Cuckoo filters may or may not deduplicate, 
435        // but should at least contain the key
436        fm.mark_trusted();
437        assert_eq!(fm.might_exist("same-key"), Some(true));
438    }
439
440    #[test]
441    fn test_false_positive_rate_reasonable() {
442        let fm = FilterManager::new("test-node", 10000);
443        fm.mark_trusted();
444        
445        // Insert 1000 keys
446        for i in 0..1000 {
447            fm.insert(&format!("inserted-{}", i));
448        }
449        
450        // Check 1000 keys that were NOT inserted
451        let mut false_positives = 0;
452        for i in 0..1000 {
453            if fm.might_exist(&format!("not-inserted-{}", i)) == Some(true) {
454                false_positives += 1;
455            }
456        }
457        
458        // False positive rate should be < 5% for cuckoo filters
459        let fp_rate = false_positives as f64 / 1000.0;
460        assert!(fp_rate < 0.05, "False positive rate {} is too high", fp_rate);
461    }
462}