Skip to main content

probabilistic_rs/ebloom/
storage.rs

1use crate::ebloom::config::{ExpiringFilterConfig, LevelMetadata};
2use crate::ebloom::error::EbloomError;
3use async_trait::async_trait;
4use std::sync::Arc;
5
6type Result<T> = std::result::Result<T, EbloomError>;
7
8/// Storage backend trait for expiring bloom filter persistence
9#[async_trait]
10pub trait ExpiringStorageBackend {
11    /// Save the expiring filter configuration
12    async fn save_config(&self, config: &ExpiringFilterConfig) -> Result<()>;
13
14    /// Load the expiring filter configuration
15    async fn load_config(&self) -> Result<ExpiringFilterConfig>;
16
17    /// Save metadata for all levels
18    async fn save_level_metadata(&self, metadata: &[LevelMetadata])
19    -> Result<()>;
20
21    /// Load metadata for all levels
22    async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>>;
23
24    /// Save current level index
25    async fn save_current_level(&self, current_level: usize) -> Result<()>;
26
27    /// Load current level index
28    async fn load_current_level(&self) -> Result<usize>;
29
30    /// Save chunks for a specific level
31    async fn save_level_chunks(
32        &self,
33        level: usize,
34        chunks: &[(usize, Vec<u8>)],
35    ) -> Result<()>;
36
37    /// Load chunks for a specific level
38    async fn load_level_chunks(
39        &self,
40        level: usize,
41    ) -> Result<Vec<(usize, Vec<u8>)>>;
42
43    /// Save dirty chunks for a specific level
44    async fn save_dirty_chunks(
45        &self,
46        level: usize,
47        dirty_chunks: &[(usize, Vec<u8>)],
48    ) -> Result<()>;
49
50    /// Load dirty chunks for a specific level
51    async fn load_dirty_chunks(
52        &self,
53        level: usize,
54    ) -> Result<Vec<(usize, Vec<u8>)>>;
55
56    /// Delete all data for a specific level (during rotation)
57    async fn delete_level(&self, level: usize) -> Result<()>;
58}
59
60/// In-memory storage backend for testing
61pub struct InMemoryExpiringStorage {
62    config: Option<ExpiringFilterConfig>,
63    metadata: Vec<LevelMetadata>,
64    current_level: usize,
65    level_chunks: std::collections::HashMap<usize, Vec<(usize, Vec<u8>)>>,
66    dirty_chunks: std::collections::HashMap<usize, Vec<(usize, Vec<u8>)>>,
67}
68
69impl Default for InMemoryExpiringStorage {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75impl InMemoryExpiringStorage {
76    pub fn new() -> Self {
77        Self {
78            config: None,
79            metadata: Vec::new(),
80            current_level: 0,
81            level_chunks: std::collections::HashMap::new(),
82            dirty_chunks: std::collections::HashMap::new(),
83        }
84    }
85}
86
87#[async_trait]
88impl ExpiringStorageBackend for InMemoryExpiringStorage {
89    async fn save_config(&self, _config: &ExpiringFilterConfig) -> Result<()> {
90        Ok(())
91    }
92
93    async fn load_config(&self) -> Result<ExpiringFilterConfig> {
94        Ok(self
95            .config
96            .as_ref()
97            .ok_or_else(|| {
98                EbloomError::ConfigError("No config found".to_string())
99            })?
100            .clone())
101    }
102
103    async fn save_level_metadata(
104        &self,
105        _metadata: &[LevelMetadata],
106    ) -> Result<()> {
107        Ok(())
108    }
109
110    async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>> {
111        Ok(self.metadata.clone())
112    }
113
114    async fn save_current_level(&self, _current_level: usize) -> Result<()> {
115        Ok(())
116    }
117
118    async fn load_current_level(&self) -> Result<usize> {
119        Ok(self.current_level)
120    }
121
122    async fn save_level_chunks(
123        &self,
124        _level: usize,
125        _chunks: &[(usize, Vec<u8>)],
126    ) -> Result<()> {
127        Ok(())
128    }
129
130    async fn load_level_chunks(
131        &self,
132        level: usize,
133    ) -> Result<Vec<(usize, Vec<u8>)>> {
134        Ok(self.level_chunks.get(&level).cloned().unwrap_or_default())
135    }
136
137    async fn save_dirty_chunks(
138        &self,
139        _level: usize,
140        _dirty_chunks: &[(usize, Vec<u8>)],
141    ) -> Result<()> {
142        Ok(())
143    }
144
145    async fn load_dirty_chunks(
146        &self,
147        level: usize,
148    ) -> Result<Vec<(usize, Vec<u8>)>> {
149        Ok(self.dirty_chunks.get(&level).cloned().unwrap_or_default())
150    }
151
152    async fn delete_level(&self, _level: usize) -> Result<()> {
153        Ok(())
154    }
155}
156
157/// Fjall storage backend for expiring bloom filters
158#[cfg(feature = "fjall")]
159pub struct FjallExpiringBackend {
160    db: Arc<fjall::Database>,
161    config_partition: Arc<fjall::Keyspace>,
162    metadata_partition: Arc<fjall::Keyspace>,
163    chunks_partitions: Vec<Arc<fjall::Keyspace>>,
164    dirty_partitions: Vec<Arc<fjall::Keyspace>>,
165    max_levels: usize,
166}
167
168#[cfg(feature = "fjall")]
169impl FjallExpiringBackend {
170    pub async fn new(
171        db_path: std::path::PathBuf,
172        max_levels: usize,
173    ) -> Result<Self> {
174        let db =
175            Arc::new(fjall::Database::builder(&db_path).open().map_err(|e| {
176                EbloomError::StorageError(format!("Failed to open Fjall DB: {e}"))
177            })?);
178
179        let config_partition = Arc::new(
180            db.keyspace("expiring_config", fjall::KeyspaceCreateOptions::default)
181                .map_err(|e| {
182                    EbloomError::StorageError(format!(
183                        "Failed to open config partition: {e}",
184                    ))
185                })?,
186        );
187
188        let metadata_partition = Arc::new(
189            db.keyspace("level_metadata", fjall::KeyspaceCreateOptions::default)
190                .map_err(|e| {
191                    EbloomError::StorageError(format!(
192                        "Failed to open metadata partition: {e}"
193                    ))
194                })?,
195        );
196
197        let mut chunks_partitions = Vec::with_capacity(max_levels);
198        let mut dirty_partitions = Vec::with_capacity(max_levels);
199
200        for level in 0..max_levels {
201            let chunks_partition = Arc::new(
202                db.keyspace(
203                    &format!("level_{level}_chunks"),
204                    fjall::KeyspaceCreateOptions::default,
205                )
206                .map_err(|e| {
207                    EbloomError::StorageError(format!(
208                        "Failed to open level {} chunks partition: {e}",
209                        level
210                    ))
211                })?,
212            );
213            chunks_partitions.push(chunks_partition);
214
215            let dirty_partition = Arc::new(
216                db.keyspace(
217                    &format!("level_{level}_dirty"),
218                    fjall::KeyspaceCreateOptions::default,
219                )
220                .map_err(|e| {
221                    EbloomError::StorageError(format!(
222                        "Failed to open level {} dirty partition: {e}",
223                        level
224                    ))
225                })?,
226            );
227            dirty_partitions.push(dirty_partition);
228        }
229
230        Ok(Self {
231            db,
232            config_partition,
233            metadata_partition,
234            chunks_partitions,
235            dirty_partitions,
236            max_levels,
237        })
238    }
239
240    fn get_chunks_partition(
241        &self,
242        level: usize,
243    ) -> Option<&Arc<fjall::Keyspace>> {
244        self.chunks_partitions.get(level)
245    }
246
247    fn get_dirty_partition(&self, level: usize) -> Option<&Arc<fjall::Keyspace>> {
248        self.dirty_partitions.get(level)
249    }
250}
251
252#[cfg(feature = "fjall")]
253#[async_trait]
254impl ExpiringStorageBackend for FjallExpiringBackend {
255    async fn save_config(&self, config: &ExpiringFilterConfig) -> Result<()> {
256        let config_bytes = config.to_bytes()?;
257
258        self.config_partition
259            .insert("expiring_bloom_config", config_bytes)
260            .map_err(|e| {
261                EbloomError::StorageError(format!("Failed to save config: {e}"))
262            })?;
263
264        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
265            EbloomError::StorageError(format!("Failed to persist config: {e}"))
266        })?;
267
268        Ok(())
269    }
270
271    async fn load_config(&self) -> Result<ExpiringFilterConfig> {
272        match self.config_partition.get("expiring_bloom_config") {
273            Ok(Some(config_bytes)) => {
274                let config = ExpiringFilterConfig::from_bytes(&config_bytes)?;
275                Ok(config)
276            }
277            Ok(None) => {
278                Err(EbloomError::ConfigError("Config not found".to_string()))
279            }
280            Err(e) => Err(EbloomError::StorageError(format!(
281                "Failed to load config: {e}"
282            ))),
283        }
284    }
285
286    async fn save_level_metadata(
287        &self,
288        metadata: &[LevelMetadata],
289    ) -> Result<()> {
290        let metadata_bytes = self.serialize_metadata(metadata)?;
291
292        self.metadata_partition
293            .insert("level_metadata", metadata_bytes)
294            .map_err(|e| {
295                EbloomError::StorageError(format!(
296                    "Failed to save level metadata: {e}"
297                ))
298            })?;
299
300        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
301            EbloomError::StorageError(format!(
302                "Failed to persist level metadata: {e}"
303            ))
304        })?;
305
306        Ok(())
307    }
308
309    async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>> {
310        match self.metadata_partition.get("level_metadata") {
311            Ok(Some(metadata_bytes)) => {
312                let metadata = self.deserialize_metadata(&metadata_bytes)?;
313                Ok(metadata)
314            }
315            Ok(None) => Ok(vec![]),
316            Err(e) => Err(EbloomError::StorageError(format!(
317                "Failed to load level metadata: {e}"
318            ))),
319        }
320    }
321
322    async fn save_current_level(&self, current_level: usize) -> Result<()> {
323        if current_level > 255 {
324            return Err(EbloomError::InvalidLevel {
325                level: current_level,
326                max_levels: 255,
327            });
328        }
329        let level_bytes = (current_level as u8).to_le_bytes();
330
331        self.config_partition
332            .insert("current_level", level_bytes)
333            .map_err(|e| {
334                EbloomError::StorageError(format!(
335                    "Failed to save current level: {e}"
336                ))
337            })?;
338
339        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
340            EbloomError::StorageError(format!(
341                "Failed to persist current level: {e}"
342            ))
343        })?;
344
345        Ok(())
346    }
347
348    async fn load_current_level(&self) -> Result<usize> {
349        match self.config_partition.get("current_level") {
350            Ok(Some(level_bytes)) => {
351                if !level_bytes.is_empty() {
352                    Ok(level_bytes[0] as usize)
353                } else {
354                    Err(EbloomError::StorageError(
355                        "Invalid current level data".to_string(),
356                    ))
357                }
358            }
359            Ok(None) => Ok(0),
360            Err(e) => Err(EbloomError::StorageError(format!(
361                "Failed to load current level: {e}"
362            ))),
363        }
364    }
365
366    async fn save_level_chunks(
367        &self,
368        level: usize,
369        chunks: &[(usize, Vec<u8>)],
370    ) -> Result<()> {
371        let Some(partition) = self.get_chunks_partition(level) else {
372            return Err(EbloomError::InvalidLevel {
373                level,
374                max_levels: self.max_levels,
375            });
376        };
377
378        for (chunk_id, chunk_data) in chunks {
379            let key = format!("chunk_{chunk_id}");
380            partition.insert(&key, chunk_data).map_err(|e| {
381                EbloomError::StorageError(format!(
382                    "Failed to save level {} chunk {}: {e}",
383                    level, chunk_id
384                ))
385            })?;
386        }
387
388        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
389            EbloomError::StorageError(format!(
390                "Failed to persist level {} chunks: {e}",
391                level
392            ))
393        })?;
394
395        Ok(())
396    }
397
398    async fn load_level_chunks(
399        &self,
400        level: usize,
401    ) -> Result<Vec<(usize, Vec<u8>)>> {
402        let Some(partition) = self.get_chunks_partition(level) else {
403            return Err(EbloomError::InvalidLevel {
404                level,
405                max_levels: self.max_levels,
406            });
407        };
408
409        let mut chunks = Vec::new();
410
411        for guard in partition.iter() {
412            let (key, value) = guard.into_inner().map_err(|e| {
413                EbloomError::StorageError(format!(
414                    "Failed to read level {} chunk: {e}",
415                    level
416                ))
417            })?;
418
419            if let Some(chunk_id_str) = key.strip_prefix(b"chunk_")
420                && let Ok(chunk_id_str) = std::str::from_utf8(chunk_id_str)
421                && let Ok(chunk_id) = chunk_id_str.parse::<usize>()
422            {
423                chunks.push((chunk_id, value.to_vec()));
424            }
425        }
426
427        chunks.sort_by_key(|(id, _)| *id);
428        Ok(chunks)
429    }
430
431    async fn save_dirty_chunks(
432        &self,
433        level: usize,
434        dirty_chunks: &[(usize, Vec<u8>)],
435    ) -> Result<()> {
436        let Some(partition) = self.get_dirty_partition(level) else {
437            return Err(EbloomError::InvalidLevel {
438                level,
439                max_levels: self.max_levels,
440            });
441        };
442
443        for (chunk_id, chunk_data) in dirty_chunks {
444            let key = format!("dirty_{chunk_id}");
445            partition.insert(&key, chunk_data).map_err(|e| {
446                EbloomError::StorageError(format!(
447                    "Failed to save level {} dirty chunk {}: {e}",
448                    level, chunk_id
449                ))
450            })?;
451        }
452
453        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
454            EbloomError::StorageError(format!(
455                "Failed to persist level {} dirty chunks: {e}",
456                level
457            ))
458        })?;
459
460        Ok(())
461    }
462
463    async fn load_dirty_chunks(
464        &self,
465        level: usize,
466    ) -> Result<Vec<(usize, Vec<u8>)>> {
467        let Some(partition) = self.get_dirty_partition(level) else {
468            return Err(EbloomError::InvalidLevel {
469                level,
470                max_levels: self.max_levels,
471            });
472        };
473
474        let mut chunks = Vec::new();
475
476        for guard in partition.iter() {
477            let (key, value) = guard.into_inner().map_err(|e| {
478                EbloomError::StorageError(format!(
479                    "Failed to read level {} dirty chunk: {e}",
480                    level
481                ))
482            })?;
483
484            if let Some(chunk_id_str) = key.strip_prefix(b"dirty_")
485                && let Ok(chunk_id_str) = std::str::from_utf8(chunk_id_str)
486                && let Ok(chunk_id) = chunk_id_str.parse::<usize>()
487            {
488                chunks.push((chunk_id, value.to_vec()));
489            }
490        }
491
492        chunks.sort_by_key(|(id, _)| *id);
493        Ok(chunks)
494    }
495
496    async fn delete_level(&self, level: usize) -> Result<()> {
497        let Some(chunks_partition) = self.get_chunks_partition(level) else {
498            return Err(EbloomError::InvalidLevel {
499                level,
500                max_levels: self.max_levels,
501            });
502        };
503
504        let Some(dirty_partition) = self.get_dirty_partition(level) else {
505            return Err(EbloomError::InvalidLevel {
506                level,
507                max_levels: self.max_levels,
508            });
509        };
510
511        let mut keys_to_delete_chunks = Vec::new();
512        for guard in chunks_partition.iter() {
513            let (key, _) = guard.into_inner().map_err(|e| {
514                EbloomError::StorageError(format!(
515                    "Failed to iterate level {} chunks for deletion: {e}",
516                    level
517                ))
518            })?;
519            keys_to_delete_chunks.push(key.to_vec());
520        }
521
522        for key in keys_to_delete_chunks {
523            if let Ok(key_str) = std::str::from_utf8(&key) {
524                chunks_partition.remove(key_str).map_err(|e| {
525                    EbloomError::StorageError(format!(
526                        "Failed to delete level {} chunk {}: {e}",
527                        level, key_str
528                    ))
529                })?;
530            }
531        }
532
533        let mut keys_to_delete_dirty = Vec::new();
534        for guard in dirty_partition.iter() {
535            let (key, _) = guard.into_inner().map_err(|e| {
536                EbloomError::StorageError(format!(
537                    "Failed to iterate level {} dirty chunks for deletion: {e}",
538                    level
539                ))
540            })?;
541            keys_to_delete_dirty.push(key.to_vec());
542        }
543
544        for key in keys_to_delete_dirty {
545            if let Ok(key_str) = std::str::from_utf8(&key) {
546                dirty_partition.remove(key_str).map_err(|e| {
547                    EbloomError::StorageError(format!(
548                        "Failed to delete level {} dirty chunk {}: {e}",
549                        level, key_str
550                    ))
551                })?;
552            }
553        }
554
555        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
556            EbloomError::StorageError(format!(
557                "Failed to persist level {} deletion: {e}",
558                level
559            ))
560        })?;
561
562        Ok(())
563    }
564}
565
566#[cfg(feature = "fjall")]
567impl FjallExpiringBackend {
568    fn serialize_metadata(&self, metadata: &[LevelMetadata]) -> Result<Vec<u8>> {
569        postcard::to_allocvec(metadata)
570            .map_err(|e| EbloomError::SerializationError(e.to_string()))
571    }
572
573    fn deserialize_metadata(&self, bytes: &[u8]) -> Result<Vec<LevelMetadata>> {
574        postcard::from_bytes(bytes)
575            .map_err(|e| EbloomError::SerializationError(e.to_string()))
576    }
577}