use ipfrs_core::{Cid, Error, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::path::Path;
const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
pub struct BloomFilter {
inner: RwLock<BloomFilterInner>,
config: BloomConfig,
}
#[derive(Serialize, Deserialize)]
struct BloomFilterInner {
bits: Vec<u64>,
count: usize,
}
#[derive(Debug, Clone)]
pub struct BloomConfig {
pub expected_items: usize,
pub false_positive_rate: f64,
pub num_hashes: usize,
pub num_bits: usize,
}
impl BloomConfig {
pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
let num_bits =
(-((expected_items as f64) * false_positive_rate.ln()) / ln2_squared).ceil() as usize;
let num_hashes =
((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
let num_bits = num_bits.max(64);
let num_hashes = num_hashes.clamp(1, 16);
Self {
expected_items,
false_positive_rate,
num_hashes,
num_bits,
}
}
pub fn low_memory(expected_items: usize) -> Self {
Self::new(expected_items, 0.05) }
pub fn high_accuracy(expected_items: usize) -> Self {
Self::new(expected_items, 0.001) }
#[inline]
pub fn memory_bytes(&self) -> usize {
self.num_bits.div_ceil(64) * 8
}
}
impl Default for BloomConfig {
fn default() -> Self {
Self::new(100_000, DEFAULT_FALSE_POSITIVE_RATE)
}
}
impl BloomFilter {
pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
let config = BloomConfig::new(expected_items, false_positive_rate);
Self::with_config(config)
}
pub fn with_config(config: BloomConfig) -> Self {
let num_u64s = config.num_bits.div_ceil(64);
let inner = BloomFilterInner {
bits: vec![0u64; num_u64s],
count: 0,
};
Self {
inner: RwLock::new(inner),
config,
}
}
#[inline]
pub fn insert_cid(&self, cid: &Cid) {
self.insert(&cid.to_bytes());
}
#[inline]
pub fn contains_cid(&self, cid: &Cid) -> bool {
self.contains(&cid.to_bytes())
}
pub fn insert(&self, data: &[u8]) {
let mut inner = self.inner.write();
let hashes = self.compute_hashes(data);
for hash in hashes {
let bit_index = hash % self.config.num_bits;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
inner.bits[word_index] |= 1u64 << bit_offset;
}
inner.count += 1;
}
pub fn contains(&self, data: &[u8]) -> bool {
let inner = self.inner.read();
let hashes = self.compute_hashes(data);
for hash in hashes {
let bit_index = hash % self.config.num_bits;
let word_index = bit_index / 64;
let bit_offset = bit_index % 64;
if inner.bits[word_index] & (1u64 << bit_offset) == 0 {
return false;
}
}
true
}
fn compute_hashes(&self, data: &[u8]) -> Vec<usize> {
let h1 = fnv1a_hash(data);
let h2 = fnv1a_hash_with_seed(data, 0x811c_9dc5);
let mut hashes = Vec::with_capacity(self.config.num_hashes);
for i in 0..self.config.num_hashes {
let hash = h1.wrapping_add((i as u64).wrapping_mul(h2));
hashes.push(hash as usize);
}
hashes
}
#[inline]
pub fn count(&self) -> usize {
self.inner.read().count
}
pub fn fill_ratio(&self) -> f64 {
let inner = self.inner.read();
let set_bits: usize = inner.bits.iter().map(|w| w.count_ones() as usize).sum();
set_bits as f64 / self.config.num_bits as f64
}
pub fn estimated_fpr(&self) -> f64 {
let fill = self.fill_ratio();
fill.powi(self.config.num_hashes as i32)
}
#[inline]
pub fn memory_bytes(&self) -> usize {
self.config.memory_bytes()
}
pub fn clear(&self) {
let mut inner = self.inner.write();
for word in inner.bits.iter_mut() {
*word = 0;
}
inner.count = 0;
}
pub fn save_to_file(&self, path: &Path) -> Result<()> {
let inner = self.inner.read();
let data = oxicode::serde::encode_to_vec(&*inner, oxicode::config::standard())
.map_err(|e| Error::Serialization(format!("Failed to serialize bloom filter: {e}")))?;
std::fs::write(path, data)
.map_err(|e| Error::Storage(format!("Failed to write bloom filter: {e}")))?;
Ok(())
}
pub fn load_from_file(path: &Path, config: BloomConfig) -> Result<Self> {
let data = std::fs::read(path)
.map_err(|e| Error::Storage(format!("Failed to read bloom filter: {e}")))?;
let inner: BloomFilterInner =
oxicode::serde::decode_owned_from_slice(&data, oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| {
Error::Deserialization(format!("Failed to deserialize bloom filter: {e}"))
})?;
let expected_words = config.num_bits.div_ceil(64);
if inner.bits.len() != expected_words {
return Err(Error::InvalidData(format!(
"Bloom filter size mismatch: expected {} words, got {}",
expected_words,
inner.bits.len()
)));
}
Ok(Self {
inner: RwLock::new(inner),
config,
})
}
pub fn stats(&self) -> BloomStats {
BloomStats {
count: self.count(),
memory_bytes: self.memory_bytes(),
fill_ratio: self.fill_ratio(),
estimated_fpr: self.estimated_fpr(),
num_bits: self.config.num_bits,
num_hashes: self.config.num_hashes,
}
}
}
#[derive(Debug, Clone)]
pub struct BloomStats {
pub count: usize,
pub memory_bytes: usize,
pub fill_ratio: f64,
pub estimated_fpr: f64,
pub num_bits: usize,
pub num_hashes: usize,
}
#[inline]
fn fnv1a_hash(data: &[u8]) -> u64 {
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0100_0000_01b3;
let mut hash = FNV_OFFSET;
for &byte in data {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
#[inline]
fn fnv1a_hash_with_seed(data: &[u8], seed: u64) -> u64 {
const FNV_PRIME: u64 = 0x0100_0000_01b3;
let mut hash = seed;
for &byte in data {
hash ^= byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
use crate::traits::BlockStore;
use async_trait::async_trait;
use ipfrs_core::Block;
pub struct BloomBlockStore<S: BlockStore> {
store: S,
filter: BloomFilter,
}
impl<S: BlockStore> BloomBlockStore<S> {
pub fn new(store: S, expected_items: usize, false_positive_rate: f64) -> Self {
Self {
store,
filter: BloomFilter::new(expected_items, false_positive_rate),
}
}
pub fn with_config(store: S, config: BloomConfig) -> Self {
Self {
store,
filter: BloomFilter::with_config(config),
}
}
pub fn rebuild_filter(&self) -> Result<()> {
self.filter.clear();
for cid in self.store.list_cids()? {
self.filter.insert_cid(&cid);
}
Ok(())
}
pub fn bloom_stats(&self) -> BloomStats {
self.filter.stats()
}
#[inline]
pub fn store(&self) -> &S {
&self.store
}
}
#[async_trait]
impl<S: BlockStore> BlockStore for BloomBlockStore<S> {
async fn put(&self, block: &Block) -> Result<()> {
self.filter.insert_cid(block.cid());
self.store.put(block).await
}
async fn put_many(&self, blocks: &[Block]) -> Result<()> {
for block in blocks {
self.filter.insert_cid(block.cid());
}
self.store.put_many(blocks).await
}
async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
if !self.filter.contains_cid(cid) {
return Ok(None);
}
self.store.get(cid).await
}
async fn has(&self, cid: &Cid) -> Result<bool> {
if !self.filter.contains_cid(cid) {
return Ok(false);
}
self.store.has(cid).await
}
async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
let mut results = Vec::with_capacity(cids.len());
let mut to_check = Vec::new();
let mut indices = Vec::new();
for (i, cid) in cids.iter().enumerate() {
if self.filter.contains_cid(cid) {
to_check.push(*cid);
indices.push(i);
}
results.push(false); }
if !to_check.is_empty() {
let store_results = self.store.has_many(&to_check).await?;
for (idx, exists) in indices.into_iter().zip(store_results) {
results[idx] = exists;
}
}
Ok(results)
}
async fn delete(&self, cid: &Cid) -> Result<()> {
self.store.delete(cid).await
}
async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
self.store.delete_many(cids).await
}
fn list_cids(&self) -> Result<Vec<Cid>> {
self.store.list_cids()
}
fn len(&self) -> usize {
self.store.len()
}
fn is_empty(&self) -> bool {
self.store.is_empty()
}
async fn flush(&self) -> Result<()> {
self.store.flush().await
}
async fn close(&self) -> Result<()> {
self.store.close().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_filter_basic() {
let filter = BloomFilter::new(1000, 0.01);
filter.insert(b"hello");
filter.insert(b"world");
assert!(filter.contains(b"hello"));
assert!(filter.contains(b"world"));
assert!(!filter.contains(b"foo")); }
#[test]
fn test_bloom_filter_false_positive_rate() {
let filter = BloomFilter::new(10000, 0.01);
for i in 0i32..10000 {
filter.insert(&i.to_le_bytes());
}
let mut false_positives = 0;
for i in 10000i32..20000 {
if filter.contains(&i.to_le_bytes()) {
false_positives += 1;
}
}
let fpr = false_positives as f64 / 10000.0;
assert!(fpr < 0.03, "False positive rate {} too high", fpr);
}
#[test]
fn test_bloom_config_memory() {
let config = BloomConfig::new(1_000_000, 0.01);
let memory_mb = config.memory_bytes() as f64 / (1024.0 * 1024.0);
assert!(
memory_mb < 10.0,
"Memory {} MB exceeds 10MB target",
memory_mb
);
}
#[test]
fn test_bloom_filter_stats() {
let filter = BloomFilter::new(1000, 0.01);
for i in 0i32..100 {
filter.insert(&i.to_le_bytes());
}
let stats = filter.stats();
assert_eq!(stats.count, 100);
assert!(stats.fill_ratio > 0.0);
assert!(stats.fill_ratio < 1.0);
}
}