sync_engine/cuckoo/
filter_persistence.rs

1//! Cuckoo filter persistence for fast startup.
2//!
3//! Stores filter state alongside the merkle root that was current when
4//! the filter was saved. On startup, if the current merkle root matches
5//! the saved root, the filter is immediately trusted.
6//!
7//! # Storage
8//!
9//! Uses the WAL SQLite database for storage (reuses existing connection).
10//!
11//! ```sql
12//! CREATE TABLE cf_state (
13//!     filter_id TEXT PRIMARY KEY,      -- "l2" or "l3"
14//!     filter_bytes BLOB NOT NULL,      -- Exported filter data
15//!     merkle_root BLOB NOT NULL,       -- 32-byte root hash when saved
16//!     entry_count INTEGER NOT NULL,    -- Number of entries at save time
17//!     saved_at INTEGER NOT NULL        -- Unix timestamp
18//! );
19//! ```
20
21use crate::StorageError;
22use sqlx::{AnyPool, Row, any::AnyPoolOptions};
23use std::time::Duration;
24use tracing::{debug, info, warn};
25
26/// Filter IDs for persistence
27pub const L2_FILTER_ID: &str = "l2";
28pub const L3_FILTER_ID: &str = "l3";
29
30/// Saved filter state
31#[derive(Debug, Clone)]
32pub struct SavedFilterState {
33    /// Exported filter bytes
34    pub filter_bytes: Vec<u8>,
35    /// Merkle root when filter was saved
36    pub merkle_root: [u8; 32],
37    /// Number of entries at save time
38    pub entry_count: usize,
39    /// Unix timestamp when saved
40    pub saved_at: u64,
41}
42
43/// Cuckoo filter persistence manager.
44///
45/// Stores and retrieves filter state from SQLite for fast startup.
46pub struct FilterPersistence {
47    pool: AnyPool,
48}
49
50impl FilterPersistence {
51    /// Create a new filter persistence manager.
52    ///
53    /// Uses the same SQLite file as the WAL for simplicity.
54    pub async fn new(sqlite_path: &str) -> Result<Self, StorageError> {
55        sqlx::any::install_default_drivers();
56        
57        let url = format!("sqlite://{}?mode=rwc", sqlite_path);
58        
59        let pool = AnyPoolOptions::new()
60            .max_connections(2)
61            .acquire_timeout(Duration::from_secs(5))
62            .connect(&url)
63            .await
64            .map_err(|e| StorageError::Backend(format!(
65                "Failed to connect to filter persistence DB: {}", e
66            )))?;
67
68        let persistence = Self { pool };
69        persistence.init_schema().await?;
70        
71        Ok(persistence)
72    }
73
74    /// Create from existing pool (share with WAL).
75    pub fn from_pool(pool: AnyPool) -> Self {
76        Self { pool }
77    }
78
79    async fn init_schema(&self) -> Result<(), StorageError> {
80        sqlx::query(
81            r#"
82            CREATE TABLE IF NOT EXISTS cf_state (
83                filter_id TEXT PRIMARY KEY,
84                filter_bytes BLOB NOT NULL,
85                merkle_root BLOB NOT NULL,
86                entry_count INTEGER NOT NULL,
87                saved_at INTEGER NOT NULL
88            )
89            "#
90        )
91            .execute(&self.pool)
92            .await
93            .map_err(|e| StorageError::Backend(format!(
94                "Failed to create cf_state table: {}", e
95            )))?;
96
97        Ok(())
98    }
99
100    /// Save filter state with current merkle root.
101    pub async fn save(
102        &self,
103        filter_id: &str,
104        filter_bytes: &[u8],
105        merkle_root: &[u8; 32],
106        entry_count: usize,
107    ) -> Result<(), StorageError> {
108        let now = std::time::SystemTime::now()
109            .duration_since(std::time::UNIX_EPOCH)
110            .unwrap_or_default()
111            .as_secs() as i64;
112
113        sqlx::query(
114            r#"
115            INSERT INTO cf_state (filter_id, filter_bytes, merkle_root, entry_count, saved_at)
116            VALUES (?, ?, ?, ?, ?)
117            ON CONFLICT(filter_id) DO UPDATE SET
118                filter_bytes = excluded.filter_bytes,
119                merkle_root = excluded.merkle_root,
120                entry_count = excluded.entry_count,
121                saved_at = excluded.saved_at
122            "#
123        )
124            .bind(filter_id)
125            .bind(filter_bytes)
126            .bind(merkle_root.as_slice())
127            .bind(entry_count as i64)
128            .bind(now)
129            .execute(&self.pool)
130            .await
131            .map_err(|e| StorageError::Backend(format!(
132                "Failed to save filter state: {}", e
133            )))?;
134
135        info!(
136            filter_id,
137            entry_count,
138            bytes = filter_bytes.len(),
139            "Filter state saved"
140        );
141        Ok(())
142    }
143
144    /// Load filter state if it exists.
145    pub async fn load(&self, filter_id: &str) -> Result<Option<SavedFilterState>, StorageError> {
146        let result = sqlx::query(
147            "SELECT filter_bytes, merkle_root, entry_count, saved_at FROM cf_state WHERE filter_id = ?"
148        )
149            .bind(filter_id)
150            .fetch_optional(&self.pool)
151            .await
152            .map_err(|e| StorageError::Backend(format!(
153                "Failed to load filter state: {}", e
154            )))?;
155
156        match result {
157            Some(row) => {
158                let filter_bytes: Vec<u8> = row.try_get("filter_bytes")
159                    .map_err(|e| StorageError::Backend(e.to_string()))?;
160                let root_bytes: Vec<u8> = row.try_get("merkle_root")
161                    .map_err(|e| StorageError::Backend(e.to_string()))?;
162                let entry_count: i64 = row.try_get("entry_count")
163                    .map_err(|e| StorageError::Backend(e.to_string()))?;
164                let saved_at: i64 = row.try_get("saved_at")
165                    .map_err(|e| StorageError::Backend(e.to_string()))?;
166
167                if root_bytes.len() != 32 {
168                    warn!(
169                        filter_id,
170                        len = root_bytes.len(),
171                        "Invalid merkle root length in saved filter state"
172                    );
173                    return Ok(None);
174                }
175
176                let mut merkle_root = [0u8; 32];
177                merkle_root.copy_from_slice(&root_bytes);
178
179                debug!(
180                    filter_id,
181                    entry_count,
182                    bytes = filter_bytes.len(),
183                    "Loaded saved filter state"
184                );
185
186                Ok(Some(SavedFilterState {
187                    filter_bytes,
188                    merkle_root,
189                    entry_count: entry_count as usize,
190                    saved_at: saved_at as u64,
191                }))
192            }
193            None => Ok(None),
194        }
195    }
196
197    /// Delete saved filter state.
198    pub async fn delete(&self, filter_id: &str) -> Result<(), StorageError> {
199        sqlx::query("DELETE FROM cf_state WHERE filter_id = ?")
200            .bind(filter_id)
201            .execute(&self.pool)
202            .await
203            .map_err(|e| StorageError::Backend(format!(
204                "Failed to delete filter state: {}", e
205            )))?;
206
207        Ok(())
208    }
209
210    /// Check if a saved filter matches the current merkle root.
211    ///
212    /// If it matches, the filter can be trusted immediately.
213    pub async fn is_filter_current(
214        &self,
215        filter_id: &str,
216        current_root: &[u8; 32],
217    ) -> Result<bool, StorageError> {
218        match self.load(filter_id).await? {
219            Some(saved) => Ok(&saved.merkle_root == current_root),
220            None => Ok(false),
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use std::path::PathBuf;
229    
230    fn temp_db_path(name: &str) -> PathBuf {
231        std::env::temp_dir().join(format!("cf_test_{}.db", name))
232    }
233
234    #[tokio::test]
235    async fn test_filter_persistence_roundtrip() {
236        let db_path = temp_db_path("roundtrip");
237        let _ = std::fs::remove_file(&db_path); // Clean up any old test
238        
239        let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
240        
241        let filter_bytes = vec![1, 2, 3, 4, 5];
242        let merkle_root = [42u8; 32];
243        let entry_count = 100;
244
245        // Save
246        persistence.save(L3_FILTER_ID, &filter_bytes, &merkle_root, entry_count)
247            .await.unwrap();
248
249        // Load
250        let loaded = persistence.load(L3_FILTER_ID).await.unwrap();
251        assert!(loaded.is_some());
252
253        let state = loaded.unwrap();
254        assert_eq!(state.filter_bytes, filter_bytes);
255        assert_eq!(state.merkle_root, merkle_root);
256        assert_eq!(state.entry_count, entry_count);
257        
258        let _ = std::fs::remove_file(&db_path); // Clean up
259    }
260
261    #[tokio::test]
262    async fn test_filter_current_check() {
263        let db_path = temp_db_path("current_check");
264        let _ = std::fs::remove_file(&db_path); // Clean up any old test
265        
266        let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
267        
268        let merkle_root = [42u8; 32];
269        persistence.save(L3_FILTER_ID, &[1, 2, 3], &merkle_root, 10)
270            .await.unwrap();
271
272        // Same root = current
273        assert!(persistence.is_filter_current(L3_FILTER_ID, &merkle_root).await.unwrap());
274
275        // Different root = not current
276        let different_root = [99u8; 32];
277        assert!(!persistence.is_filter_current(L3_FILTER_ID, &different_root).await.unwrap());
278        
279        let _ = std::fs::remove_file(&db_path); // Clean up
280    }
281}