1use ipfrs_core::{Cid, Error, Result};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use std::path::Path;
22
23const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
25
26pub struct BloomFilter {
31 inner: RwLock<BloomFilterInner>,
33 config: BloomConfig,
35}
36
37#[derive(Serialize, Deserialize)]
39struct BloomFilterInner {
40 bits: Vec<u64>,
42 count: usize,
44}
45
46#[derive(Debug, Clone)]
48pub struct BloomConfig {
49 pub expected_items: usize,
51 pub false_positive_rate: f64,
53 pub num_hashes: usize,
55 pub num_bits: usize,
57}
58
59impl BloomConfig {
60 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
62 let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
65 let num_bits =
66 (-((expected_items as f64) * false_positive_rate.ln()) / ln2_squared).ceil() as usize;
67
68 let num_hashes =
70 ((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
71
72 let num_bits = num_bits.max(64);
74 let num_hashes = num_hashes.clamp(1, 16); Self {
77 expected_items,
78 false_positive_rate,
79 num_hashes,
80 num_bits,
81 }
82 }
83
84 pub fn low_memory(expected_items: usize) -> Self {
86 Self::new(expected_items, 0.05) }
88
89 pub fn high_accuracy(expected_items: usize) -> Self {
91 Self::new(expected_items, 0.001) }
93
94 #[inline]
96 pub fn memory_bytes(&self) -> usize {
97 self.num_bits.div_ceil(64) * 8
99 }
100}
101
102impl Default for BloomConfig {
103 fn default() -> Self {
104 Self::new(100_000, DEFAULT_FALSE_POSITIVE_RATE)
105 }
106}
107
108impl BloomFilter {
109 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
115 let config = BloomConfig::new(expected_items, false_positive_rate);
116 Self::with_config(config)
117 }
118
119 pub fn with_config(config: BloomConfig) -> Self {
121 let num_u64s = config.num_bits.div_ceil(64);
122 let inner = BloomFilterInner {
123 bits: vec![0u64; num_u64s],
124 count: 0,
125 };
126 Self {
127 inner: RwLock::new(inner),
128 config,
129 }
130 }
131
132 #[inline]
134 pub fn insert_cid(&self, cid: &Cid) {
135 self.insert(&cid.to_bytes());
136 }
137
138 #[inline]
143 pub fn contains_cid(&self, cid: &Cid) -> bool {
144 self.contains(&cid.to_bytes())
145 }
146
147 pub fn insert(&self, data: &[u8]) {
149 let mut inner = self.inner.write();
150 let hashes = self.compute_hashes(data);
151
152 for hash in hashes {
153 let bit_index = hash % self.config.num_bits;
154 let word_index = bit_index / 64;
155 let bit_offset = bit_index % 64;
156 inner.bits[word_index] |= 1u64 << bit_offset;
157 }
158 inner.count += 1;
159 }
160
161 pub fn contains(&self, data: &[u8]) -> bool {
163 let inner = self.inner.read();
164 let hashes = self.compute_hashes(data);
165
166 for hash in hashes {
167 let bit_index = hash % self.config.num_bits;
168 let word_index = bit_index / 64;
169 let bit_offset = bit_index % 64;
170 if inner.bits[word_index] & (1u64 << bit_offset) == 0 {
171 return false;
172 }
173 }
174 true
175 }
176
177 fn compute_hashes(&self, data: &[u8]) -> Vec<usize> {
179 let h1 = fnv1a_hash(data);
181 let h2 = fnv1a_hash_with_seed(data, 0x811c_9dc5);
182
183 let mut hashes = Vec::with_capacity(self.config.num_hashes);
184 for i in 0..self.config.num_hashes {
185 let hash = h1.wrapping_add((i as u64).wrapping_mul(h2));
187 hashes.push(hash as usize);
188 }
189 hashes
190 }
191
192 #[inline]
194 pub fn count(&self) -> usize {
195 self.inner.read().count
196 }
197
198 pub fn fill_ratio(&self) -> f64 {
200 let inner = self.inner.read();
201 let set_bits: usize = inner.bits.iter().map(|w| w.count_ones() as usize).sum();
202 set_bits as f64 / self.config.num_bits as f64
203 }
204
205 pub fn estimated_fpr(&self) -> f64 {
207 let fill = self.fill_ratio();
208 fill.powi(self.config.num_hashes as i32)
209 }
210
211 #[inline]
213 pub fn memory_bytes(&self) -> usize {
214 self.config.memory_bytes()
215 }
216
217 pub fn clear(&self) {
219 let mut inner = self.inner.write();
220 for word in inner.bits.iter_mut() {
221 *word = 0;
222 }
223 inner.count = 0;
224 }
225
226 pub fn save_to_file(&self, path: &Path) -> Result<()> {
228 let inner = self.inner.read();
229 let data = oxicode::serde::encode_to_vec(&*inner, oxicode::config::standard())
230 .map_err(|e| Error::Serialization(format!("Failed to serialize bloom filter: {e}")))?;
231 std::fs::write(path, data)
232 .map_err(|e| Error::Storage(format!("Failed to write bloom filter: {e}")))?;
233 Ok(())
234 }
235
236 pub fn load_from_file(path: &Path, config: BloomConfig) -> Result<Self> {
238 let data = std::fs::read(path)
239 .map_err(|e| Error::Storage(format!("Failed to read bloom filter: {e}")))?;
240 let inner: BloomFilterInner =
241 oxicode::serde::decode_owned_from_slice(&data, oxicode::config::standard())
242 .map(|(v, _)| v)
243 .map_err(|e| {
244 Error::Deserialization(format!("Failed to deserialize bloom filter: {e}"))
245 })?;
246
247 let expected_words = config.num_bits.div_ceil(64);
249 if inner.bits.len() != expected_words {
250 return Err(Error::InvalidData(format!(
251 "Bloom filter size mismatch: expected {} words, got {}",
252 expected_words,
253 inner.bits.len()
254 )));
255 }
256
257 Ok(Self {
258 inner: RwLock::new(inner),
259 config,
260 })
261 }
262
263 pub fn stats(&self) -> BloomStats {
265 BloomStats {
266 count: self.count(),
267 memory_bytes: self.memory_bytes(),
268 fill_ratio: self.fill_ratio(),
269 estimated_fpr: self.estimated_fpr(),
270 num_bits: self.config.num_bits,
271 num_hashes: self.config.num_hashes,
272 }
273 }
274}
275
276#[derive(Debug, Clone)]
278pub struct BloomStats {
279 pub count: usize,
281 pub memory_bytes: usize,
283 pub fill_ratio: f64,
285 pub estimated_fpr: f64,
287 pub num_bits: usize,
289 pub num_hashes: usize,
291}
292
293#[inline]
295fn fnv1a_hash(data: &[u8]) -> u64 {
296 const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
297 const FNV_PRIME: u64 = 0x0100_0000_01b3;
298
299 let mut hash = FNV_OFFSET;
300 for &byte in data {
301 hash ^= byte as u64;
302 hash = hash.wrapping_mul(FNV_PRIME);
303 }
304 hash
305}
306
307#[inline]
309fn fnv1a_hash_with_seed(data: &[u8], seed: u64) -> u64 {
310 const FNV_PRIME: u64 = 0x0100_0000_01b3;
311
312 let mut hash = seed;
313 for &byte in data {
314 hash ^= byte as u64;
315 hash = hash.wrapping_mul(FNV_PRIME);
316 }
317 hash
318}
319
320use crate::traits::BlockStore;
322use async_trait::async_trait;
323use ipfrs_core::Block;
324
325pub struct BloomBlockStore<S: BlockStore> {
326 store: S,
327 filter: BloomFilter,
328}
329
330impl<S: BlockStore> BloomBlockStore<S> {
331 pub fn new(store: S, expected_items: usize, false_positive_rate: f64) -> Self {
333 Self {
334 store,
335 filter: BloomFilter::new(expected_items, false_positive_rate),
336 }
337 }
338
339 pub fn with_config(store: S, config: BloomConfig) -> Self {
341 Self {
342 store,
343 filter: BloomFilter::with_config(config),
344 }
345 }
346
347 pub fn rebuild_filter(&self) -> Result<()> {
349 self.filter.clear();
350 for cid in self.store.list_cids()? {
351 self.filter.insert_cid(&cid);
352 }
353 Ok(())
354 }
355
356 pub fn bloom_stats(&self) -> BloomStats {
358 self.filter.stats()
359 }
360
361 #[inline]
363 pub fn store(&self) -> &S {
364 &self.store
365 }
366}
367
368#[async_trait]
369impl<S: BlockStore> BlockStore for BloomBlockStore<S> {
370 async fn put(&self, block: &Block) -> Result<()> {
371 self.filter.insert_cid(block.cid());
372 self.store.put(block).await
373 }
374
375 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
376 for block in blocks {
377 self.filter.insert_cid(block.cid());
378 }
379 self.store.put_many(blocks).await
380 }
381
382 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
383 if !self.filter.contains_cid(cid) {
385 return Ok(None);
386 }
387 self.store.get(cid).await
389 }
390
391 async fn has(&self, cid: &Cid) -> Result<bool> {
392 if !self.filter.contains_cid(cid) {
394 return Ok(false);
395 }
396 self.store.has(cid).await
398 }
399
400 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
401 let mut results = Vec::with_capacity(cids.len());
403 let mut to_check = Vec::new();
404 let mut indices = Vec::new();
405
406 for (i, cid) in cids.iter().enumerate() {
407 if self.filter.contains_cid(cid) {
408 to_check.push(*cid);
409 indices.push(i);
410 }
411 results.push(false); }
413
414 if !to_check.is_empty() {
416 let store_results = self.store.has_many(&to_check).await?;
417 for (idx, exists) in indices.into_iter().zip(store_results) {
418 results[idx] = exists;
419 }
420 }
421
422 Ok(results)
423 }
424
425 async fn delete(&self, cid: &Cid) -> Result<()> {
426 self.store.delete(cid).await
429 }
430
431 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
432 self.store.delete_many(cids).await
433 }
434
435 fn list_cids(&self) -> Result<Vec<Cid>> {
436 self.store.list_cids()
437 }
438
439 fn len(&self) -> usize {
440 self.store.len()
441 }
442
443 fn is_empty(&self) -> bool {
444 self.store.is_empty()
445 }
446
447 async fn flush(&self) -> Result<()> {
448 self.store.flush().await
449 }
450
451 async fn close(&self) -> Result<()> {
452 self.store.close().await
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn test_bloom_filter_basic() {
462 let filter = BloomFilter::new(1000, 0.01);
463
464 filter.insert(b"hello");
465 filter.insert(b"world");
466
467 assert!(filter.contains(b"hello"));
468 assert!(filter.contains(b"world"));
469 assert!(!filter.contains(b"foo")); }
471
472 #[test]
473 fn test_bloom_filter_false_positive_rate() {
474 let filter = BloomFilter::new(10000, 0.01);
475
476 for i in 0i32..10000 {
478 filter.insert(&i.to_le_bytes());
479 }
480
481 let mut false_positives = 0;
483 for i in 10000i32..20000 {
484 if filter.contains(&i.to_le_bytes()) {
485 false_positives += 1;
486 }
487 }
488
489 let fpr = false_positives as f64 / 10000.0;
491 assert!(fpr < 0.03, "False positive rate {} too high", fpr);
492 }
493
494 #[test]
495 fn test_bloom_config_memory() {
496 let config = BloomConfig::new(1_000_000, 0.01);
497 let memory_mb = config.memory_bytes() as f64 / (1024.0 * 1024.0);
498 assert!(
500 memory_mb < 10.0,
501 "Memory {} MB exceeds 10MB target",
502 memory_mb
503 );
504 }
505
506 #[test]
507 fn test_bloom_filter_stats() {
508 let filter = BloomFilter::new(1000, 0.01);
509
510 for i in 0i32..100 {
511 filter.insert(&i.to_le_bytes());
512 }
513
514 let stats = filter.stats();
515 assert_eq!(stats.count, 100);
516 assert!(stats.fill_ratio > 0.0);
517 assert!(stats.fill_ratio < 1.0);
518 }
519}