sync_engine/cuckoo/
filter_persistence.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Cuckoo filter persistence for fast startup.
5//!
6//! Stores filter state alongside the merkle root that was current when
7//! the filter was saved. On startup, if the current merkle root matches
8//! the saved root, the filter is immediately trusted.
9//!
10//! # Storage
11//!
12//! Uses the WAL SQLite database for storage (reuses existing connection).
13//!
14//! ```sql
15//! CREATE TABLE cf_state (
16//!     filter_id TEXT PRIMARY KEY,      -- "l2" or "l3"
17//!     filter_bytes BLOB NOT NULL,      -- Exported filter data
18//!     merkle_root BLOB NOT NULL,       -- 32-byte root hash when saved
19//!     entry_count INTEGER NOT NULL,    -- Number of entries at save time
20//!     saved_at INTEGER NOT NULL        -- Unix timestamp
21//! );
22//! ```
23
24use crate::StorageError;
25use sqlx::{AnyPool, Row, any::AnyPoolOptions};
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29/// Filter IDs for persistence
30pub const L2_FILTER_ID: &str = "l2";
31pub const L3_FILTER_ID: &str = "l3";
32
33/// Saved filter state
34#[derive(Debug, Clone)]
35pub struct SavedFilterState {
36    /// Exported filter bytes
37    pub filter_bytes: Vec<u8>,
38    /// Merkle root when filter was saved
39    pub merkle_root: [u8; 32],
40    /// Number of entries at save time
41    pub entry_count: usize,
42    /// Unix timestamp when saved
43    pub saved_at: u64,
44}
45
46/// Cuckoo filter persistence manager.
47///
48/// Stores and retrieves filter state from SQLite for fast startup.
49pub struct FilterPersistence {
50    pool: AnyPool,
51}
52
53impl FilterPersistence {
54    /// Create a new filter persistence manager.
55    ///
56    /// Uses the same SQLite file as the WAL for simplicity.
57    pub async fn new(sqlite_path: &str) -> Result<Self, StorageError> {
58        sqlx::any::install_default_drivers();
59        
60        let url = format!("sqlite://{}?mode=rwc", sqlite_path);
61        
62        let pool = AnyPoolOptions::new()
63            .max_connections(2)
64            .acquire_timeout(Duration::from_secs(5))
65            .connect(&url)
66            .await
67            .map_err(|e| StorageError::Backend(format!(
68                "Failed to connect to filter persistence DB: {}", e
69            )))?;
70
71        let persistence = Self { pool };
72        persistence.init_schema().await?;
73        
74        Ok(persistence)
75    }
76
77    /// Create from existing pool (share with WAL).
78    pub fn from_pool(pool: AnyPool) -> Self {
79        Self { pool }
80    }
81
82    async fn init_schema(&self) -> Result<(), StorageError> {
83        sqlx::query(
84            r#"
85            CREATE TABLE IF NOT EXISTS cf_state (
86                filter_id TEXT PRIMARY KEY,
87                filter_bytes BLOB NOT NULL,
88                merkle_root BLOB NOT NULL,
89                entry_count INTEGER NOT NULL,
90                saved_at INTEGER NOT NULL
91            )
92            "#
93        )
94            .execute(&self.pool)
95            .await
96            .map_err(|e| StorageError::Backend(format!(
97                "Failed to create cf_state table: {}", e
98            )))?;
99
100        Ok(())
101    }
102
103    /// Save filter state with current merkle root.
104    pub async fn save(
105        &self,
106        filter_id: &str,
107        filter_bytes: &[u8],
108        merkle_root: &[u8; 32],
109        entry_count: usize,
110    ) -> Result<(), StorageError> {
111        let now = std::time::SystemTime::now()
112            .duration_since(std::time::UNIX_EPOCH)
113            .unwrap_or_default()
114            .as_secs() as i64;
115
116        sqlx::query(
117            r#"
118            INSERT INTO cf_state (filter_id, filter_bytes, merkle_root, entry_count, saved_at)
119            VALUES (?, ?, ?, ?, ?)
120            ON CONFLICT(filter_id) DO UPDATE SET
121                filter_bytes = excluded.filter_bytes,
122                merkle_root = excluded.merkle_root,
123                entry_count = excluded.entry_count,
124                saved_at = excluded.saved_at
125            "#
126        )
127            .bind(filter_id)
128            .bind(filter_bytes)
129            .bind(merkle_root.as_slice())
130            .bind(entry_count as i64)
131            .bind(now)
132            .execute(&self.pool)
133            .await
134            .map_err(|e| StorageError::Backend(format!(
135                "Failed to save filter state: {}", e
136            )))?;
137
138        info!(
139            filter_id,
140            entry_count,
141            bytes = filter_bytes.len(),
142            "Filter state saved"
143        );
144        Ok(())
145    }
146
147    /// Load filter state if it exists.
148    pub async fn load(&self, filter_id: &str) -> Result<Option<SavedFilterState>, StorageError> {
149        let result = sqlx::query(
150            "SELECT filter_bytes, merkle_root, entry_count, saved_at FROM cf_state WHERE filter_id = ?"
151        )
152            .bind(filter_id)
153            .fetch_optional(&self.pool)
154            .await
155            .map_err(|e| StorageError::Backend(format!(
156                "Failed to load filter state: {}", e
157            )))?;
158
159        match result {
160            Some(row) => {
161                let filter_bytes: Vec<u8> = row.try_get("filter_bytes")
162                    .map_err(|e| StorageError::Backend(e.to_string()))?;
163                let root_bytes: Vec<u8> = row.try_get("merkle_root")
164                    .map_err(|e| StorageError::Backend(e.to_string()))?;
165                let entry_count: i64 = row.try_get("entry_count")
166                    .map_err(|e| StorageError::Backend(e.to_string()))?;
167                let saved_at: i64 = row.try_get("saved_at")
168                    .map_err(|e| StorageError::Backend(e.to_string()))?;
169
170                if root_bytes.len() != 32 {
171                    warn!(
172                        filter_id,
173                        len = root_bytes.len(),
174                        "Invalid merkle root length in saved filter state"
175                    );
176                    return Ok(None);
177                }
178
179                let mut merkle_root = [0u8; 32];
180                merkle_root.copy_from_slice(&root_bytes);
181
182                debug!(
183                    filter_id,
184                    entry_count,
185                    bytes = filter_bytes.len(),
186                    "Loaded saved filter state"
187                );
188
189                Ok(Some(SavedFilterState {
190                    filter_bytes,
191                    merkle_root,
192                    entry_count: entry_count as usize,
193                    saved_at: saved_at as u64,
194                }))
195            }
196            None => Ok(None),
197        }
198    }
199
200    /// Delete saved filter state.
201    pub async fn delete(&self, filter_id: &str) -> Result<(), StorageError> {
202        sqlx::query("DELETE FROM cf_state WHERE filter_id = ?")
203            .bind(filter_id)
204            .execute(&self.pool)
205            .await
206            .map_err(|e| StorageError::Backend(format!(
207                "Failed to delete filter state: {}", e
208            )))?;
209
210        Ok(())
211    }
212
213    /// Check if a saved filter matches the current merkle root.
214    ///
215    /// If it matches, the filter can be trusted immediately.
216    pub async fn is_filter_current(
217        &self,
218        filter_id: &str,
219        current_root: &[u8; 32],
220    ) -> Result<bool, StorageError> {
221        match self.load(filter_id).await? {
222            Some(saved) => Ok(&saved.merkle_root == current_root),
223            None => Ok(false),
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use std::path::PathBuf;
232    
233    fn temp_db_path(name: &str) -> PathBuf {
234        std::env::temp_dir().join(format!("cf_test_{}.db", name))
235    }
236
237    #[tokio::test]
238    async fn test_filter_persistence_roundtrip() {
239        let db_path = temp_db_path("roundtrip");
240        let _ = std::fs::remove_file(&db_path); // Clean up any old test
241        
242        let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
243        
244        let filter_bytes = vec![1, 2, 3, 4, 5];
245        let merkle_root = [42u8; 32];
246        let entry_count = 100;
247
248        // Save
249        persistence.save(L3_FILTER_ID, &filter_bytes, &merkle_root, entry_count)
250            .await.unwrap();
251
252        // Load
253        let loaded = persistence.load(L3_FILTER_ID).await.unwrap();
254        assert!(loaded.is_some());
255
256        let state = loaded.unwrap();
257        assert_eq!(state.filter_bytes, filter_bytes);
258        assert_eq!(state.merkle_root, merkle_root);
259        assert_eq!(state.entry_count, entry_count);
260        
261        let _ = std::fs::remove_file(&db_path); // Clean up
262    }
263
264    #[tokio::test]
265    async fn test_filter_current_check() {
266        let db_path = temp_db_path("current_check");
267        let _ = std::fs::remove_file(&db_path); // Clean up any old test
268        
269        let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
270        
271        let merkle_root = [42u8; 32];
272        persistence.save(L3_FILTER_ID, &[1, 2, 3], &merkle_root, 10)
273            .await.unwrap();
274
275        // Same root = current
276        assert!(persistence.is_filter_current(L3_FILTER_ID, &merkle_root).await.unwrap());
277
278        // Different root = not current
279        let different_root = [99u8; 32];
280        assert!(!persistence.is_filter_current(L3_FILTER_ID, &different_root).await.unwrap());
281        
282        let _ = std::fs::remove_file(&db_path); // Clean up
283    }
284}