probabilistic-rs 0.6.4

Probabilistic data structures in Rust
Documentation
use super::{BloomError, BloomFilterConfig, BloomResult, StorageBackend};
use async_trait::async_trait;
use std::sync::Arc;

#[cfg(feature = "fjall")]
#[derive(Clone)]
pub struct FjallBackend {
    db: Arc<fjall::Database>,
    config_partition: Arc<fjall::Keyspace>,
    chunks_partition: Arc<fjall::Keyspace>,
}

#[cfg(feature = "fjall")]
#[async_trait]
impl StorageBackend for FjallBackend {
    async fn save_config(&self, config: &BloomFilterConfig) -> BloomResult<()> {
        let config_bytes = config.to_bytes()?;

        self.config_partition
            .insert("bloom_config", config_bytes)
            .map_err(|e| {
                BloomError::StorageError(format!("Failed to save config: {e}"))
            })?;

        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
            BloomError::StorageError(format!("Failed to persist config: {e}"))
        })?;

        Ok(())
    }

    async fn load_config(&self) -> BloomResult<BloomFilterConfig> {
        match self.config_partition.get("bloom_config") {
            Ok(Some(config_bytes)) => {
                let config = BloomFilterConfig::from_bytes(&config_bytes)?;
                Ok(config)
            }
            Ok(None) => Err(BloomError::ConfigNotFound),
            Err(e) => Err(BloomError::StorageError(format!(
                "Failed to load config: {e}"
            ))),
        }
    }

    async fn save_snapshot(
        &self,
        chunks: &[(usize, Vec<u8>)],
    ) -> BloomResult<()> {
        for (chunk_id, chunk_data) in chunks {
            let key = format!("chunk_{chunk_id}");
            self.chunks_partition
                .insert(&key, chunk_data)
                .map_err(|e| {
                    BloomError::StorageError(format!("Failed to save chunk: {e}"))
                })?;
        }

        self.db.persist(fjall::PersistMode::SyncAll).map_err(|e| {
            BloomError::StorageError(format!("Failed to persist chunks: {e}"))
        })?;

        Ok(())
    }

    async fn load_snapshot(&self) -> BloomResult<Vec<(usize, Vec<u8>)>> {
        let mut chunks = Vec::new();

        for guard in self.chunks_partition.iter() {
            let (key, value) = guard.into_inner().map_err(|e| {
                BloomError::StorageError(format!("Failed to read chunk: {e}"))
            })?;

            if let Some(chunk_id_str) = key.strip_prefix(b"chunk_")
                && let Ok(chunk_id_str) = std::str::from_utf8(chunk_id_str)
                && let Ok(chunk_id) = chunk_id_str.parse::<usize>()
            {
                chunks.push((chunk_id, value.to_vec()));
            }
        }

        if chunks.is_empty() {
            Ok(vec![])
        } else {
            chunks.sort_by_key(|(id, _)| *id);
            Ok(chunks)
        }
    }
}

#[cfg(feature = "fjall")]
impl FjallBackend {
    pub async fn new(db_path: std::path::PathBuf) -> BloomResult<Self> {
        let db =
            Arc::new(fjall::Database::builder(&db_path).open().map_err(|e| {
                BloomError::StorageError(format!("Failed to open Fjall DB: {e}"))
            })?);

        let config_partition = Arc::new(
            db.keyspace("config", fjall::KeyspaceCreateOptions::default)
                .map_err(|e| {
                    BloomError::StorageError(format!(
                        "Failed to open config partition: {e}",
                    ))
                })?,
        );

        let chunks_partition = Arc::new(
            db.keyspace("chunks", fjall::KeyspaceCreateOptions::default)
                .map_err(|e| {
                    BloomError::StorageError(format!(
                        "Failed to open chunks partition: {e}"
                    ))
                })?,
        );

        Ok(Self {
            db,
            config_partition,
            chunks_partition,
        })
    }
}