sync_engine/cuckoo/
filter_persistence.rs1use crate::StorageError;
22use sqlx::{AnyPool, Row, any::AnyPoolOptions};
23use std::time::Duration;
24use tracing::{debug, info, warn};
25
26pub const L2_FILTER_ID: &str = "l2";
28pub const L3_FILTER_ID: &str = "l3";
29
30#[derive(Debug, Clone)]
32pub struct SavedFilterState {
33 pub filter_bytes: Vec<u8>,
35 pub merkle_root: [u8; 32],
37 pub entry_count: usize,
39 pub saved_at: u64,
41}
42
43pub struct FilterPersistence {
47 pool: AnyPool,
48}
49
50impl FilterPersistence {
51 pub async fn new(sqlite_path: &str) -> Result<Self, StorageError> {
55 sqlx::any::install_default_drivers();
56
57 let url = format!("sqlite://{}?mode=rwc", sqlite_path);
58
59 let pool = AnyPoolOptions::new()
60 .max_connections(2)
61 .acquire_timeout(Duration::from_secs(5))
62 .connect(&url)
63 .await
64 .map_err(|e| StorageError::Backend(format!(
65 "Failed to connect to filter persistence DB: {}", e
66 )))?;
67
68 let persistence = Self { pool };
69 persistence.init_schema().await?;
70
71 Ok(persistence)
72 }
73
74 pub fn from_pool(pool: AnyPool) -> Self {
76 Self { pool }
77 }
78
79 async fn init_schema(&self) -> Result<(), StorageError> {
80 sqlx::query(
81 r#"
82 CREATE TABLE IF NOT EXISTS cf_state (
83 filter_id TEXT PRIMARY KEY,
84 filter_bytes BLOB NOT NULL,
85 merkle_root BLOB NOT NULL,
86 entry_count INTEGER NOT NULL,
87 saved_at INTEGER NOT NULL
88 )
89 "#
90 )
91 .execute(&self.pool)
92 .await
93 .map_err(|e| StorageError::Backend(format!(
94 "Failed to create cf_state table: {}", e
95 )))?;
96
97 Ok(())
98 }
99
100 pub async fn save(
102 &self,
103 filter_id: &str,
104 filter_bytes: &[u8],
105 merkle_root: &[u8; 32],
106 entry_count: usize,
107 ) -> Result<(), StorageError> {
108 let now = std::time::SystemTime::now()
109 .duration_since(std::time::UNIX_EPOCH)
110 .unwrap_or_default()
111 .as_secs() as i64;
112
113 sqlx::query(
114 r#"
115 INSERT INTO cf_state (filter_id, filter_bytes, merkle_root, entry_count, saved_at)
116 VALUES (?, ?, ?, ?, ?)
117 ON CONFLICT(filter_id) DO UPDATE SET
118 filter_bytes = excluded.filter_bytes,
119 merkle_root = excluded.merkle_root,
120 entry_count = excluded.entry_count,
121 saved_at = excluded.saved_at
122 "#
123 )
124 .bind(filter_id)
125 .bind(filter_bytes)
126 .bind(merkle_root.as_slice())
127 .bind(entry_count as i64)
128 .bind(now)
129 .execute(&self.pool)
130 .await
131 .map_err(|e| StorageError::Backend(format!(
132 "Failed to save filter state: {}", e
133 )))?;
134
135 info!(
136 filter_id,
137 entry_count,
138 bytes = filter_bytes.len(),
139 "Filter state saved"
140 );
141 Ok(())
142 }
143
144 pub async fn load(&self, filter_id: &str) -> Result<Option<SavedFilterState>, StorageError> {
146 let result = sqlx::query(
147 "SELECT filter_bytes, merkle_root, entry_count, saved_at FROM cf_state WHERE filter_id = ?"
148 )
149 .bind(filter_id)
150 .fetch_optional(&self.pool)
151 .await
152 .map_err(|e| StorageError::Backend(format!(
153 "Failed to load filter state: {}", e
154 )))?;
155
156 match result {
157 Some(row) => {
158 let filter_bytes: Vec<u8> = row.try_get("filter_bytes")
159 .map_err(|e| StorageError::Backend(e.to_string()))?;
160 let root_bytes: Vec<u8> = row.try_get("merkle_root")
161 .map_err(|e| StorageError::Backend(e.to_string()))?;
162 let entry_count: i64 = row.try_get("entry_count")
163 .map_err(|e| StorageError::Backend(e.to_string()))?;
164 let saved_at: i64 = row.try_get("saved_at")
165 .map_err(|e| StorageError::Backend(e.to_string()))?;
166
167 if root_bytes.len() != 32 {
168 warn!(
169 filter_id,
170 len = root_bytes.len(),
171 "Invalid merkle root length in saved filter state"
172 );
173 return Ok(None);
174 }
175
176 let mut merkle_root = [0u8; 32];
177 merkle_root.copy_from_slice(&root_bytes);
178
179 debug!(
180 filter_id,
181 entry_count,
182 bytes = filter_bytes.len(),
183 "Loaded saved filter state"
184 );
185
186 Ok(Some(SavedFilterState {
187 filter_bytes,
188 merkle_root,
189 entry_count: entry_count as usize,
190 saved_at: saved_at as u64,
191 }))
192 }
193 None => Ok(None),
194 }
195 }
196
197 pub async fn delete(&self, filter_id: &str) -> Result<(), StorageError> {
199 sqlx::query("DELETE FROM cf_state WHERE filter_id = ?")
200 .bind(filter_id)
201 .execute(&self.pool)
202 .await
203 .map_err(|e| StorageError::Backend(format!(
204 "Failed to delete filter state: {}", e
205 )))?;
206
207 Ok(())
208 }
209
210 pub async fn is_filter_current(
214 &self,
215 filter_id: &str,
216 current_root: &[u8; 32],
217 ) -> Result<bool, StorageError> {
218 match self.load(filter_id).await? {
219 Some(saved) => Ok(&saved.merkle_root == current_root),
220 None => Ok(false),
221 }
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use std::path::PathBuf;
229
230 fn temp_db_path(name: &str) -> PathBuf {
231 std::env::temp_dir().join(format!("cf_test_{}.db", name))
232 }
233
234 #[tokio::test]
235 async fn test_filter_persistence_roundtrip() {
236 let db_path = temp_db_path("roundtrip");
237 let _ = std::fs::remove_file(&db_path); let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
240
241 let filter_bytes = vec![1, 2, 3, 4, 5];
242 let merkle_root = [42u8; 32];
243 let entry_count = 100;
244
245 persistence.save(L3_FILTER_ID, &filter_bytes, &merkle_root, entry_count)
247 .await.unwrap();
248
249 let loaded = persistence.load(L3_FILTER_ID).await.unwrap();
251 assert!(loaded.is_some());
252
253 let state = loaded.unwrap();
254 assert_eq!(state.filter_bytes, filter_bytes);
255 assert_eq!(state.merkle_root, merkle_root);
256 assert_eq!(state.entry_count, entry_count);
257
258 let _ = std::fs::remove_file(&db_path); }
260
261 #[tokio::test]
262 async fn test_filter_current_check() {
263 let db_path = temp_db_path("current_check");
264 let _ = std::fs::remove_file(&db_path); let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
267
268 let merkle_root = [42u8; 32];
269 persistence.save(L3_FILTER_ID, &[1, 2, 3], &merkle_root, 10)
270 .await.unwrap();
271
272 assert!(persistence.is_filter_current(L3_FILTER_ID, &merkle_root).await.unwrap());
274
275 let different_root = [99u8; 32];
277 assert!(!persistence.is_filter_current(L3_FILTER_ID, &different_root).await.unwrap());
278
279 let _ = std::fs::remove_file(&db_path); }
281}