sync_engine/cuckoo/
filter_persistence.rs1use crate::StorageError;
25use sqlx::{AnyPool, Row, any::AnyPoolOptions};
26use std::time::Duration;
27use tracing::{debug, info, warn};
28
29pub const L2_FILTER_ID: &str = "l2";
31pub const L3_FILTER_ID: &str = "l3";
32
33#[derive(Debug, Clone)]
35pub struct SavedFilterState {
36 pub filter_bytes: Vec<u8>,
38 pub merkle_root: [u8; 32],
40 pub entry_count: usize,
42 pub saved_at: u64,
44}
45
46pub struct FilterPersistence {
50 pool: AnyPool,
51}
52
53impl FilterPersistence {
54 pub async fn new(sqlite_path: &str) -> Result<Self, StorageError> {
58 sqlx::any::install_default_drivers();
59
60 let url = format!("sqlite://{}?mode=rwc", sqlite_path);
61
62 let pool = AnyPoolOptions::new()
63 .max_connections(2)
64 .acquire_timeout(Duration::from_secs(5))
65 .connect(&url)
66 .await
67 .map_err(|e| StorageError::Backend(format!(
68 "Failed to connect to filter persistence DB: {}", e
69 )))?;
70
71 let persistence = Self { pool };
72 persistence.init_schema().await?;
73
74 Ok(persistence)
75 }
76
77 pub fn from_pool(pool: AnyPool) -> Self {
79 Self { pool }
80 }
81
82 async fn init_schema(&self) -> Result<(), StorageError> {
83 sqlx::query(
84 r#"
85 CREATE TABLE IF NOT EXISTS cf_state (
86 filter_id TEXT PRIMARY KEY,
87 filter_bytes BLOB NOT NULL,
88 merkle_root BLOB NOT NULL,
89 entry_count INTEGER NOT NULL,
90 saved_at INTEGER NOT NULL
91 )
92 "#
93 )
94 .execute(&self.pool)
95 .await
96 .map_err(|e| StorageError::Backend(format!(
97 "Failed to create cf_state table: {}", e
98 )))?;
99
100 Ok(())
101 }
102
103 pub async fn save(
105 &self,
106 filter_id: &str,
107 filter_bytes: &[u8],
108 merkle_root: &[u8; 32],
109 entry_count: usize,
110 ) -> Result<(), StorageError> {
111 let now = std::time::SystemTime::now()
112 .duration_since(std::time::UNIX_EPOCH)
113 .unwrap_or_default()
114 .as_secs() as i64;
115
116 sqlx::query(
117 r#"
118 INSERT INTO cf_state (filter_id, filter_bytes, merkle_root, entry_count, saved_at)
119 VALUES (?, ?, ?, ?, ?)
120 ON CONFLICT(filter_id) DO UPDATE SET
121 filter_bytes = excluded.filter_bytes,
122 merkle_root = excluded.merkle_root,
123 entry_count = excluded.entry_count,
124 saved_at = excluded.saved_at
125 "#
126 )
127 .bind(filter_id)
128 .bind(filter_bytes)
129 .bind(merkle_root.as_slice())
130 .bind(entry_count as i64)
131 .bind(now)
132 .execute(&self.pool)
133 .await
134 .map_err(|e| StorageError::Backend(format!(
135 "Failed to save filter state: {}", e
136 )))?;
137
138 info!(
139 filter_id,
140 entry_count,
141 bytes = filter_bytes.len(),
142 "Filter state saved"
143 );
144 Ok(())
145 }
146
147 pub async fn load(&self, filter_id: &str) -> Result<Option<SavedFilterState>, StorageError> {
149 let result = sqlx::query(
150 "SELECT filter_bytes, merkle_root, entry_count, saved_at FROM cf_state WHERE filter_id = ?"
151 )
152 .bind(filter_id)
153 .fetch_optional(&self.pool)
154 .await
155 .map_err(|e| StorageError::Backend(format!(
156 "Failed to load filter state: {}", e
157 )))?;
158
159 match result {
160 Some(row) => {
161 let filter_bytes: Vec<u8> = row.try_get("filter_bytes")
162 .map_err(|e| StorageError::Backend(e.to_string()))?;
163 let root_bytes: Vec<u8> = row.try_get("merkle_root")
164 .map_err(|e| StorageError::Backend(e.to_string()))?;
165 let entry_count: i64 = row.try_get("entry_count")
166 .map_err(|e| StorageError::Backend(e.to_string()))?;
167 let saved_at: i64 = row.try_get("saved_at")
168 .map_err(|e| StorageError::Backend(e.to_string()))?;
169
170 if root_bytes.len() != 32 {
171 warn!(
172 filter_id,
173 len = root_bytes.len(),
174 "Invalid merkle root length in saved filter state"
175 );
176 return Ok(None);
177 }
178
179 let mut merkle_root = [0u8; 32];
180 merkle_root.copy_from_slice(&root_bytes);
181
182 debug!(
183 filter_id,
184 entry_count,
185 bytes = filter_bytes.len(),
186 "Loaded saved filter state"
187 );
188
189 Ok(Some(SavedFilterState {
190 filter_bytes,
191 merkle_root,
192 entry_count: entry_count as usize,
193 saved_at: saved_at as u64,
194 }))
195 }
196 None => Ok(None),
197 }
198 }
199
200 pub async fn delete(&self, filter_id: &str) -> Result<(), StorageError> {
202 sqlx::query("DELETE FROM cf_state WHERE filter_id = ?")
203 .bind(filter_id)
204 .execute(&self.pool)
205 .await
206 .map_err(|e| StorageError::Backend(format!(
207 "Failed to delete filter state: {}", e
208 )))?;
209
210 Ok(())
211 }
212
213 pub async fn is_filter_current(
217 &self,
218 filter_id: &str,
219 current_root: &[u8; 32],
220 ) -> Result<bool, StorageError> {
221 match self.load(filter_id).await? {
222 Some(saved) => Ok(&saved.merkle_root == current_root),
223 None => Ok(false),
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use std::path::PathBuf;
232
233 fn temp_db_path(name: &str) -> PathBuf {
234 std::env::temp_dir().join(format!("cf_test_{}.db", name))
235 }
236
237 #[tokio::test]
238 async fn test_filter_persistence_roundtrip() {
239 let db_path = temp_db_path("roundtrip");
240 let _ = std::fs::remove_file(&db_path); let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
243
244 let filter_bytes = vec![1, 2, 3, 4, 5];
245 let merkle_root = [42u8; 32];
246 let entry_count = 100;
247
248 persistence.save(L3_FILTER_ID, &filter_bytes, &merkle_root, entry_count)
250 .await.unwrap();
251
252 let loaded = persistence.load(L3_FILTER_ID).await.unwrap();
254 assert!(loaded.is_some());
255
256 let state = loaded.unwrap();
257 assert_eq!(state.filter_bytes, filter_bytes);
258 assert_eq!(state.merkle_root, merkle_root);
259 assert_eq!(state.entry_count, entry_count);
260
261 let _ = std::fs::remove_file(&db_path); }
263
264 #[tokio::test]
265 async fn test_filter_current_check() {
266 let db_path = temp_db_path("current_check");
267 let _ = std::fs::remove_file(&db_path); let persistence = FilterPersistence::new(db_path.to_str().unwrap()).await.unwrap();
270
271 let merkle_root = [42u8; 32];
272 persistence.save(L3_FILTER_ID, &[1, 2, 3], &merkle_root, 10)
273 .await.unwrap();
274
275 assert!(persistence.is_filter_current(L3_FILTER_ID, &merkle_root).await.unwrap());
277
278 let different_root = [99u8; 32];
280 assert!(!persistence.is_filter_current(L3_FILTER_ID, &different_root).await.unwrap());
281
282 let _ = std::fs::remove_file(&db_path); }
284}