1use crate::ebloom::config::{ExpiringFilterConfig, LevelMetadata};
2use crate::ebloom::error::EbloomError;
3use async_trait::async_trait;
4use std::sync::Arc;
5
6type Result<T> = std::result::Result<T, EbloomError>;
7
8#[async_trait]
10pub trait ExpiringStorageBackend {
11 async fn save_config(&self, config: &ExpiringFilterConfig) -> Result<()>;
13
14 async fn load_config(&self) -> Result<ExpiringFilterConfig>;
16
17 async fn save_level_metadata(&self, metadata: &[LevelMetadata])
19 -> Result<()>;
20
21 async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>>;
23
24 async fn save_current_level(&self, current_level: usize) -> Result<()>;
26
27 async fn load_current_level(&self) -> Result<usize>;
29
30 async fn save_level_chunks(
32 &self,
33 level: usize,
34 chunks: &[(usize, Vec<u8>)],
35 ) -> Result<()>;
36
37 async fn load_level_chunks(
39 &self,
40 level: usize,
41 ) -> Result<Vec<(usize, Vec<u8>)>>;
42
43 async fn save_dirty_chunks(
45 &self,
46 level: usize,
47 dirty_chunks: &[(usize, Vec<u8>)],
48 ) -> Result<()>;
49
50 async fn load_dirty_chunks(
52 &self,
53 level: usize,
54 ) -> Result<Vec<(usize, Vec<u8>)>>;
55
56 async fn delete_level(&self, level: usize) -> Result<()>;
58}
59
60pub struct InMemoryExpiringStorage {
62 config: Option<ExpiringFilterConfig>,
63 metadata: Vec<LevelMetadata>,
64 current_level: usize,
65 level_chunks: std::collections::HashMap<usize, Vec<(usize, Vec<u8>)>>,
66 dirty_chunks: std::collections::HashMap<usize, Vec<(usize, Vec<u8>)>>,
67}
68
69impl Default for InMemoryExpiringStorage {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75impl InMemoryExpiringStorage {
76 pub fn new() -> Self {
77 Self {
78 config: None,
79 metadata: Vec::new(),
80 current_level: 0,
81 level_chunks: std::collections::HashMap::new(),
82 dirty_chunks: std::collections::HashMap::new(),
83 }
84 }
85}
86
87#[async_trait]
88impl ExpiringStorageBackend for InMemoryExpiringStorage {
89 async fn save_config(&self, _config: &ExpiringFilterConfig) -> Result<()> {
90 Ok(())
91 }
92
93 async fn load_config(&self) -> Result<ExpiringFilterConfig> {
94 Ok(self
95 .config
96 .as_ref()
97 .ok_or_else(|| {
98 EbloomError::ConfigError("No config found".to_string())
99 })?
100 .clone())
101 }
102
103 async fn save_level_metadata(
104 &self,
105 _metadata: &[LevelMetadata],
106 ) -> Result<()> {
107 Ok(())
108 }
109
110 async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>> {
111 Ok(self.metadata.clone())
112 }
113
114 async fn save_current_level(&self, _current_level: usize) -> Result<()> {
115 Ok(())
116 }
117
118 async fn load_current_level(&self) -> Result<usize> {
119 Ok(self.current_level)
120 }
121
122 async fn save_level_chunks(
123 &self,
124 _level: usize,
125 _chunks: &[(usize, Vec<u8>)],
126 ) -> Result<()> {
127 Ok(())
128 }
129
130 async fn load_level_chunks(
131 &self,
132 level: usize,
133 ) -> Result<Vec<(usize, Vec<u8>)>> {
134 Ok(self.level_chunks.get(&level).cloned().unwrap_or_default())
135 }
136
137 async fn save_dirty_chunks(
138 &self,
139 _level: usize,
140 _dirty_chunks: &[(usize, Vec<u8>)],
141 ) -> Result<()> {
142 Ok(())
143 }
144
145 async fn load_dirty_chunks(
146 &self,
147 level: usize,
148 ) -> Result<Vec<(usize, Vec<u8>)>> {
149 Ok(self.dirty_chunks.get(&level).cloned().unwrap_or_default())
150 }
151
152 async fn delete_level(&self, _level: usize) -> Result<()> {
153 Ok(())
154 }
155}
156
157#[cfg(feature = "fjall")]
159pub struct FjallExpiringBackend {
160 db: Arc<fjall::Database>,
161 config_partition: Arc<fjall::Keyspace>,
162 metadata_partition: Arc<fjall::Keyspace>,
163 chunks_partitions: Vec<Arc<fjall::Keyspace>>,
164 dirty_partitions: Vec<Arc<fjall::Keyspace>>,
165 max_levels: usize,
166}
167
168#[cfg(feature = "fjall")]
169impl FjallExpiringBackend {
170 pub async fn new(
171 db_path: std::path::PathBuf,
172 max_levels: usize,
173 ) -> Result<Self> {
174 let db =
175 Arc::new(fjall::Database::builder(&db_path).open().map_err(|e| {
176 EbloomError::StorageError(format!("Failed to open Fjall DB: {e}"))
177 })?);
178
179 let config_partition = Arc::new(
180 db.keyspace("expiring_config", fjall::KeyspaceCreateOptions::default)
181 .map_err(|e| {
182 EbloomError::StorageError(format!(
183 "Failed to open config partition: {e}",
184 ))
185 })?,
186 );
187
188 let metadata_partition = Arc::new(
189 db.keyspace("level_metadata", fjall::KeyspaceCreateOptions::default)
190 .map_err(|e| {
191 EbloomError::StorageError(format!(
192 "Failed to open metadata partition: {e}"
193 ))
194 })?,
195 );
196
197 let mut chunks_partitions = Vec::with_capacity(max_levels);
198 let mut dirty_partitions = Vec::with_capacity(max_levels);
199
200 for level in 0..max_levels {
201 let chunks_partition = Arc::new(
202 db.keyspace(
203 &format!("level_{level}_chunks"),
204 fjall::KeyspaceCreateOptions::default,
205 )
206 .map_err(|e| {
207 EbloomError::StorageError(format!(
208 "Failed to open level {} chunks partition: {e}",
209 level
210 ))
211 })?,
212 );
213 chunks_partitions.push(chunks_partition);
214
215 let dirty_partition = Arc::new(
216 db.keyspace(
217 &format!("level_{level}_dirty"),
218 fjall::KeyspaceCreateOptions::default,
219 )
220 .map_err(|e| {
221 EbloomError::StorageError(format!(
222 "Failed to open level {} dirty partition: {e}",
223 level
224 ))
225 })?,
226 );
227 dirty_partitions.push(dirty_partition);
228 }
229
230 Ok(Self {
231 db,
232 config_partition,
233 metadata_partition,
234 chunks_partitions,
235 dirty_partitions,
236 max_levels,
237 })
238 }
239
240 fn get_chunks_partition(
241 &self,
242 level: usize,
243 ) -> Option<&Arc<fjall::Keyspace>> {
244 self.chunks_partitions.get(level)
245 }
246
247 fn get_dirty_partition(&self, level: usize) -> Option<&Arc<fjall::Keyspace>> {
248 self.dirty_partitions.get(level)
249 }
250}
251
252#[cfg(feature = "fjall")]
253#[async_trait]
254impl ExpiringStorageBackend for FjallExpiringBackend {
255 async fn save_config(&self, config: &ExpiringFilterConfig) -> Result<()> {
256 let config_bytes = config.to_bytes()?;
257
258 self.config_partition
259 .insert("expiring_bloom_config", config_bytes)
260 .map_err(|e| {
261 EbloomError::StorageError(format!("Failed to save config: {e}"))
262 })?;
263
264 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
265 EbloomError::StorageError(format!("Failed to persist config: {e}"))
266 })?;
267
268 Ok(())
269 }
270
271 async fn load_config(&self) -> Result<ExpiringFilterConfig> {
272 match self.config_partition.get("expiring_bloom_config") {
273 Ok(Some(config_bytes)) => {
274 let config = ExpiringFilterConfig::from_bytes(&config_bytes)?;
275 Ok(config)
276 }
277 Ok(None) => {
278 Err(EbloomError::ConfigError("Config not found".to_string()))
279 }
280 Err(e) => Err(EbloomError::StorageError(format!(
281 "Failed to load config: {e}"
282 ))),
283 }
284 }
285
286 async fn save_level_metadata(
287 &self,
288 metadata: &[LevelMetadata],
289 ) -> Result<()> {
290 let metadata_bytes = self.serialize_metadata(metadata)?;
291
292 self.metadata_partition
293 .insert("level_metadata", metadata_bytes)
294 .map_err(|e| {
295 EbloomError::StorageError(format!(
296 "Failed to save level metadata: {e}"
297 ))
298 })?;
299
300 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
301 EbloomError::StorageError(format!(
302 "Failed to persist level metadata: {e}"
303 ))
304 })?;
305
306 Ok(())
307 }
308
309 async fn load_level_metadata(&self) -> Result<Vec<LevelMetadata>> {
310 match self.metadata_partition.get("level_metadata") {
311 Ok(Some(metadata_bytes)) => {
312 let metadata = self.deserialize_metadata(&metadata_bytes)?;
313 Ok(metadata)
314 }
315 Ok(None) => Ok(vec![]),
316 Err(e) => Err(EbloomError::StorageError(format!(
317 "Failed to load level metadata: {e}"
318 ))),
319 }
320 }
321
322 async fn save_current_level(&self, current_level: usize) -> Result<()> {
323 if current_level > 255 {
324 return Err(EbloomError::InvalidLevel {
325 level: current_level,
326 max_levels: 255,
327 });
328 }
329 let level_bytes = (current_level as u8).to_le_bytes();
330
331 self.config_partition
332 .insert("current_level", level_bytes)
333 .map_err(|e| {
334 EbloomError::StorageError(format!(
335 "Failed to save current level: {e}"
336 ))
337 })?;
338
339 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
340 EbloomError::StorageError(format!(
341 "Failed to persist current level: {e}"
342 ))
343 })?;
344
345 Ok(())
346 }
347
348 async fn load_current_level(&self) -> Result<usize> {
349 match self.config_partition.get("current_level") {
350 Ok(Some(level_bytes)) => {
351 if !level_bytes.is_empty() {
352 Ok(level_bytes[0] as usize)
353 } else {
354 Err(EbloomError::StorageError(
355 "Invalid current level data".to_string(),
356 ))
357 }
358 }
359 Ok(None) => Ok(0),
360 Err(e) => Err(EbloomError::StorageError(format!(
361 "Failed to load current level: {e}"
362 ))),
363 }
364 }
365
366 async fn save_level_chunks(
367 &self,
368 level: usize,
369 chunks: &[(usize, Vec<u8>)],
370 ) -> Result<()> {
371 let Some(partition) = self.get_chunks_partition(level) else {
372 return Err(EbloomError::InvalidLevel {
373 level,
374 max_levels: self.max_levels,
375 });
376 };
377
378 for (chunk_id, chunk_data) in chunks {
379 let key = format!("chunk_{chunk_id}");
380 partition.insert(&key, chunk_data).map_err(|e| {
381 EbloomError::StorageError(format!(
382 "Failed to save level {} chunk {}: {e}",
383 level, chunk_id
384 ))
385 })?;
386 }
387
388 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
389 EbloomError::StorageError(format!(
390 "Failed to persist level {} chunks: {e}",
391 level
392 ))
393 })?;
394
395 Ok(())
396 }
397
398 async fn load_level_chunks(
399 &self,
400 level: usize,
401 ) -> Result<Vec<(usize, Vec<u8>)>> {
402 let Some(partition) = self.get_chunks_partition(level) else {
403 return Err(EbloomError::InvalidLevel {
404 level,
405 max_levels: self.max_levels,
406 });
407 };
408
409 let mut chunks = Vec::new();
410
411 for guard in partition.iter() {
412 let (key, value) = guard.into_inner().map_err(|e| {
413 EbloomError::StorageError(format!(
414 "Failed to read level {} chunk: {e}",
415 level
416 ))
417 })?;
418
419 if let Some(chunk_id_str) = key.strip_prefix(b"chunk_")
420 && let Ok(chunk_id_str) = std::str::from_utf8(chunk_id_str)
421 && let Ok(chunk_id) = chunk_id_str.parse::<usize>()
422 {
423 chunks.push((chunk_id, value.to_vec()));
424 }
425 }
426
427 chunks.sort_by_key(|(id, _)| *id);
428 Ok(chunks)
429 }
430
431 async fn save_dirty_chunks(
432 &self,
433 level: usize,
434 dirty_chunks: &[(usize, Vec<u8>)],
435 ) -> Result<()> {
436 let Some(partition) = self.get_dirty_partition(level) else {
437 return Err(EbloomError::InvalidLevel {
438 level,
439 max_levels: self.max_levels,
440 });
441 };
442
443 for (chunk_id, chunk_data) in dirty_chunks {
444 let key = format!("dirty_{chunk_id}");
445 partition.insert(&key, chunk_data).map_err(|e| {
446 EbloomError::StorageError(format!(
447 "Failed to save level {} dirty chunk {}: {e}",
448 level, chunk_id
449 ))
450 })?;
451 }
452
453 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
454 EbloomError::StorageError(format!(
455 "Failed to persist level {} dirty chunks: {e}",
456 level
457 ))
458 })?;
459
460 Ok(())
461 }
462
463 async fn load_dirty_chunks(
464 &self,
465 level: usize,
466 ) -> Result<Vec<(usize, Vec<u8>)>> {
467 let Some(partition) = self.get_dirty_partition(level) else {
468 return Err(EbloomError::InvalidLevel {
469 level,
470 max_levels: self.max_levels,
471 });
472 };
473
474 let mut chunks = Vec::new();
475
476 for guard in partition.iter() {
477 let (key, value) = guard.into_inner().map_err(|e| {
478 EbloomError::StorageError(format!(
479 "Failed to read level {} dirty chunk: {e}",
480 level
481 ))
482 })?;
483
484 if let Some(chunk_id_str) = key.strip_prefix(b"dirty_")
485 && let Ok(chunk_id_str) = std::str::from_utf8(chunk_id_str)
486 && let Ok(chunk_id) = chunk_id_str.parse::<usize>()
487 {
488 chunks.push((chunk_id, value.to_vec()));
489 }
490 }
491
492 chunks.sort_by_key(|(id, _)| *id);
493 Ok(chunks)
494 }
495
496 async fn delete_level(&self, level: usize) -> Result<()> {
497 let Some(chunks_partition) = self.get_chunks_partition(level) else {
498 return Err(EbloomError::InvalidLevel {
499 level,
500 max_levels: self.max_levels,
501 });
502 };
503
504 let Some(dirty_partition) = self.get_dirty_partition(level) else {
505 return Err(EbloomError::InvalidLevel {
506 level,
507 max_levels: self.max_levels,
508 });
509 };
510
511 let mut keys_to_delete_chunks = Vec::new();
512 for guard in chunks_partition.iter() {
513 let (key, _) = guard.into_inner().map_err(|e| {
514 EbloomError::StorageError(format!(
515 "Failed to iterate level {} chunks for deletion: {e}",
516 level
517 ))
518 })?;
519 keys_to_delete_chunks.push(key.to_vec());
520 }
521
522 for key in keys_to_delete_chunks {
523 if let Ok(key_str) = std::str::from_utf8(&key) {
524 chunks_partition.remove(key_str).map_err(|e| {
525 EbloomError::StorageError(format!(
526 "Failed to delete level {} chunk {}: {e}",
527 level, key_str
528 ))
529 })?;
530 }
531 }
532
533 let mut keys_to_delete_dirty = Vec::new();
534 for guard in dirty_partition.iter() {
535 let (key, _) = guard.into_inner().map_err(|e| {
536 EbloomError::StorageError(format!(
537 "Failed to iterate level {} dirty chunks for deletion: {e}",
538 level
539 ))
540 })?;
541 keys_to_delete_dirty.push(key.to_vec());
542 }
543
544 for key in keys_to_delete_dirty {
545 if let Ok(key_str) = std::str::from_utf8(&key) {
546 dirty_partition.remove(key_str).map_err(|e| {
547 EbloomError::StorageError(format!(
548 "Failed to delete level {} dirty chunk {}: {e}",
549 level, key_str
550 ))
551 })?;
552 }
553 }
554
555 self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
556 EbloomError::StorageError(format!(
557 "Failed to persist level {} deletion: {e}",
558 level
559 ))
560 })?;
561
562 Ok(())
563 }
564}
565
566#[cfg(feature = "fjall")]
567impl FjallExpiringBackend {
568 fn serialize_metadata(&self, metadata: &[LevelMetadata]) -> Result<Vec<u8>> {
569 postcard::to_allocvec(metadata)
570 .map_err(|e| EbloomError::SerializationError(e.to_string()))
571 }
572
573 fn deserialize_metadata(&self, bytes: &[u8]) -> Result<Vec<LevelMetadata>> {
574 postcard::from_bytes(bytes)
575 .map_err(|e| EbloomError::SerializationError(e.to_string()))
576 }
577}