ipfrs_storage/
bloom.rs

1//! Bloom filter for probabilistic block existence checks.
2//!
3//! Provides fast probabilistic `has()` checks with configurable false positive rates.
4//! A bloom filter can quickly tell if a block definitely doesn't exist,
5//! avoiding expensive disk lookups for cache misses.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use ipfrs_storage::bloom::BloomFilter;
11//!
12//! let mut filter = BloomFilter::new(1_000_000, 0.01); // 1M items, 1% FPR
13//! filter.insert(b"block_cid_bytes");
14//! assert!(filter.contains(b"block_cid_bytes"));
15//! assert!(!filter.contains(b"unknown")); // Probably false, might be true
16//! ```
17
18use ipfrs_core::{Cid, Error, Result};
19use parking_lot::RwLock;
20use serde::{Deserialize, Serialize};
21use std::path::Path;
22
23/// Default false positive rate (1%)
24const DEFAULT_FALSE_POSITIVE_RATE: f64 = 0.01;
25
26/// Bloom filter for fast probabilistic existence checks.
27///
28/// Uses multiple hash functions to minimize false positives while
29/// maintaining constant-time lookups regardless of dataset size.
30pub struct BloomFilter {
31    /// Bit array for the bloom filter
32    inner: RwLock<BloomFilterInner>,
33    /// Configuration
34    config: BloomConfig,
35}
36
37/// Inner mutable state of the bloom filter
38#[derive(Serialize, Deserialize)]
39struct BloomFilterInner {
40    /// Bit vector
41    bits: Vec<u64>,
42    /// Number of items inserted
43    count: usize,
44}
45
46/// Bloom filter configuration
47#[derive(Debug, Clone)]
48pub struct BloomConfig {
49    /// Expected number of items
50    pub expected_items: usize,
51    /// Desired false positive rate (0.0 - 1.0)
52    pub false_positive_rate: f64,
53    /// Number of hash functions to use
54    pub num_hashes: usize,
55    /// Size of the bit array in bits
56    pub num_bits: usize,
57}
58
59impl BloomConfig {
60    /// Create a new configuration with given parameters
61    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
62        // Calculate optimal parameters
63        // m = -n * ln(p) / (ln(2)^2) where m = bits, n = items, p = FPR
64        let ln2_squared = std::f64::consts::LN_2 * std::f64::consts::LN_2;
65        let num_bits =
66            (-((expected_items as f64) * false_positive_rate.ln()) / ln2_squared).ceil() as usize;
67
68        // k = (m/n) * ln(2) where k = hash functions
69        let num_hashes =
70            ((num_bits as f64 / expected_items as f64) * std::f64::consts::LN_2).ceil() as usize;
71
72        // Ensure minimum values
73        let num_bits = num_bits.max(64);
74        let num_hashes = num_hashes.clamp(1, 16); // Cap at 16 hash functions
75
76        Self {
77            expected_items,
78            false_positive_rate,
79            num_hashes,
80            num_bits,
81        }
82    }
83
84    /// Create a configuration for low memory usage
85    pub fn low_memory(expected_items: usize) -> Self {
86        Self::new(expected_items, 0.05) // 5% FPR for smaller filter
87    }
88
89    /// Create a configuration for high accuracy
90    pub fn high_accuracy(expected_items: usize) -> Self {
91        Self::new(expected_items, 0.001) // 0.1% FPR
92    }
93
94    /// Calculate memory usage in bytes
95    #[inline]
96    pub fn memory_bytes(&self) -> usize {
97        // Round up to u64 boundary
98        self.num_bits.div_ceil(64) * 8
99    }
100}
101
102impl Default for BloomConfig {
103    fn default() -> Self {
104        Self::new(100_000, DEFAULT_FALSE_POSITIVE_RATE)
105    }
106}
107
108impl BloomFilter {
109    /// Create a new bloom filter with the given expected item count and false positive rate.
110    ///
111    /// # Arguments
112    /// * `expected_items` - Expected number of items to be stored
113    /// * `false_positive_rate` - Desired false positive rate (0.0 - 1.0)
114    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
115        let config = BloomConfig::new(expected_items, false_positive_rate);
116        Self::with_config(config)
117    }
118
119    /// Create a bloom filter with custom configuration
120    pub fn with_config(config: BloomConfig) -> Self {
121        let num_u64s = config.num_bits.div_ceil(64);
122        let inner = BloomFilterInner {
123            bits: vec![0u64; num_u64s],
124            count: 0,
125        };
126        Self {
127            inner: RwLock::new(inner),
128            config,
129        }
130    }
131
132    /// Insert a CID into the bloom filter
133    #[inline]
134    pub fn insert_cid(&self, cid: &Cid) {
135        self.insert(&cid.to_bytes());
136    }
137
138    /// Check if a CID might be in the bloom filter
139    ///
140    /// Returns `true` if the CID might be present (may be a false positive),
141    /// Returns `false` if the CID is definitely not present.
142    #[inline]
143    pub fn contains_cid(&self, cid: &Cid) -> bool {
144        self.contains(&cid.to_bytes())
145    }
146
147    /// Insert raw bytes into the bloom filter
148    pub fn insert(&self, data: &[u8]) {
149        let mut inner = self.inner.write();
150        let hashes = self.compute_hashes(data);
151
152        for hash in hashes {
153            let bit_index = hash % self.config.num_bits;
154            let word_index = bit_index / 64;
155            let bit_offset = bit_index % 64;
156            inner.bits[word_index] |= 1u64 << bit_offset;
157        }
158        inner.count += 1;
159    }
160
161    /// Check if raw bytes might be in the bloom filter
162    pub fn contains(&self, data: &[u8]) -> bool {
163        let inner = self.inner.read();
164        let hashes = self.compute_hashes(data);
165
166        for hash in hashes {
167            let bit_index = hash % self.config.num_bits;
168            let word_index = bit_index / 64;
169            let bit_offset = bit_index % 64;
170            if inner.bits[word_index] & (1u64 << bit_offset) == 0 {
171                return false;
172            }
173        }
174        true
175    }
176
177    /// Compute hash values for data using double hashing technique
178    fn compute_hashes(&self, data: &[u8]) -> Vec<usize> {
179        // Use FNV-1a for h1 and a different seed for h2
180        let h1 = fnv1a_hash(data);
181        let h2 = fnv1a_hash_with_seed(data, 0x811c_9dc5);
182
183        let mut hashes = Vec::with_capacity(self.config.num_hashes);
184        for i in 0..self.config.num_hashes {
185            // Double hashing: h(i) = h1 + i * h2
186            let hash = h1.wrapping_add((i as u64).wrapping_mul(h2));
187            hashes.push(hash as usize);
188        }
189        hashes
190    }
191
192    /// Get the number of items inserted
193    #[inline]
194    pub fn count(&self) -> usize {
195        self.inner.read().count
196    }
197
198    /// Get the fill ratio (proportion of bits set)
199    pub fn fill_ratio(&self) -> f64 {
200        let inner = self.inner.read();
201        let set_bits: usize = inner.bits.iter().map(|w| w.count_ones() as usize).sum();
202        set_bits as f64 / self.config.num_bits as f64
203    }
204
205    /// Estimate the actual false positive rate based on current fill
206    pub fn estimated_fpr(&self) -> f64 {
207        let fill = self.fill_ratio();
208        fill.powi(self.config.num_hashes as i32)
209    }
210
211    /// Get memory usage in bytes
212    #[inline]
213    pub fn memory_bytes(&self) -> usize {
214        self.config.memory_bytes()
215    }
216
217    /// Clear the bloom filter
218    pub fn clear(&self) {
219        let mut inner = self.inner.write();
220        for word in inner.bits.iter_mut() {
221            *word = 0;
222        }
223        inner.count = 0;
224    }
225
226    /// Save the bloom filter to a file
227    pub fn save_to_file(&self, path: &Path) -> Result<()> {
228        let inner = self.inner.read();
229        let data = oxicode::serde::encode_to_vec(&*inner, oxicode::config::standard())
230            .map_err(|e| Error::Serialization(format!("Failed to serialize bloom filter: {e}")))?;
231        std::fs::write(path, data)
232            .map_err(|e| Error::Storage(format!("Failed to write bloom filter: {e}")))?;
233        Ok(())
234    }
235
236    /// Load the bloom filter from a file
237    pub fn load_from_file(path: &Path, config: BloomConfig) -> Result<Self> {
238        let data = std::fs::read(path)
239            .map_err(|e| Error::Storage(format!("Failed to read bloom filter: {e}")))?;
240        let inner: BloomFilterInner =
241            oxicode::serde::decode_owned_from_slice(&data, oxicode::config::standard())
242                .map(|(v, _)| v)
243                .map_err(|e| {
244                    Error::Deserialization(format!("Failed to deserialize bloom filter: {e}"))
245                })?;
246
247        // Verify the loaded filter matches expected config
248        let expected_words = config.num_bits.div_ceil(64);
249        if inner.bits.len() != expected_words {
250            return Err(Error::InvalidData(format!(
251                "Bloom filter size mismatch: expected {} words, got {}",
252                expected_words,
253                inner.bits.len()
254            )));
255        }
256
257        Ok(Self {
258            inner: RwLock::new(inner),
259            config,
260        })
261    }
262
263    /// Get bloom filter statistics
264    pub fn stats(&self) -> BloomStats {
265        BloomStats {
266            count: self.count(),
267            memory_bytes: self.memory_bytes(),
268            fill_ratio: self.fill_ratio(),
269            estimated_fpr: self.estimated_fpr(),
270            num_bits: self.config.num_bits,
271            num_hashes: self.config.num_hashes,
272        }
273    }
274}
275
276/// Statistics about a bloom filter
277#[derive(Debug, Clone)]
278pub struct BloomStats {
279    /// Number of items inserted
280    pub count: usize,
281    /// Memory usage in bytes
282    pub memory_bytes: usize,
283    /// Proportion of bits set (0.0 - 1.0)
284    pub fill_ratio: f64,
285    /// Estimated false positive rate
286    pub estimated_fpr: f64,
287    /// Total number of bits
288    pub num_bits: usize,
289    /// Number of hash functions
290    pub num_hashes: usize,
291}
292
293/// FNV-1a hash function
294#[inline]
295fn fnv1a_hash(data: &[u8]) -> u64 {
296    const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
297    const FNV_PRIME: u64 = 0x0100_0000_01b3;
298
299    let mut hash = FNV_OFFSET;
300    for &byte in data {
301        hash ^= byte as u64;
302        hash = hash.wrapping_mul(FNV_PRIME);
303    }
304    hash
305}
306
307/// FNV-1a hash with custom seed
308#[inline]
309fn fnv1a_hash_with_seed(data: &[u8], seed: u64) -> u64 {
310    const FNV_PRIME: u64 = 0x0100_0000_01b3;
311
312    let mut hash = seed;
313    for &byte in data {
314        hash ^= byte as u64;
315        hash = hash.wrapping_mul(FNV_PRIME);
316    }
317    hash
318}
319
320/// Block store wrapper that uses a bloom filter for fast negative lookups
321use crate::traits::BlockStore;
322use async_trait::async_trait;
323use ipfrs_core::Block;
324
325pub struct BloomBlockStore<S: BlockStore> {
326    store: S,
327    filter: BloomFilter,
328}
329
330impl<S: BlockStore> BloomBlockStore<S> {
331    /// Create a new bloom-filtered block store
332    pub fn new(store: S, expected_items: usize, false_positive_rate: f64) -> Self {
333        Self {
334            store,
335            filter: BloomFilter::new(expected_items, false_positive_rate),
336        }
337    }
338
339    /// Create with custom bloom filter configuration
340    pub fn with_config(store: S, config: BloomConfig) -> Self {
341        Self {
342            store,
343            filter: BloomFilter::with_config(config),
344        }
345    }
346
347    /// Rebuild the bloom filter from the store's contents
348    pub fn rebuild_filter(&self) -> Result<()> {
349        self.filter.clear();
350        for cid in self.store.list_cids()? {
351            self.filter.insert_cid(&cid);
352        }
353        Ok(())
354    }
355
356    /// Get bloom filter statistics
357    pub fn bloom_stats(&self) -> BloomStats {
358        self.filter.stats()
359    }
360
361    /// Get reference to underlying store
362    #[inline]
363    pub fn store(&self) -> &S {
364        &self.store
365    }
366}
367
368#[async_trait]
369impl<S: BlockStore> BlockStore for BloomBlockStore<S> {
370    async fn put(&self, block: &Block) -> Result<()> {
371        self.filter.insert_cid(block.cid());
372        self.store.put(block).await
373    }
374
375    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
376        for block in blocks {
377            self.filter.insert_cid(block.cid());
378        }
379        self.store.put_many(blocks).await
380    }
381
382    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
383        // Fast path: if bloom filter says no, definitely not there
384        if !self.filter.contains_cid(cid) {
385            return Ok(None);
386        }
387        // May be a false positive, check actual store
388        self.store.get(cid).await
389    }
390
391    async fn has(&self, cid: &Cid) -> Result<bool> {
392        // Fast path: if bloom filter says no, definitely not there
393        if !self.filter.contains_cid(cid) {
394            return Ok(false);
395        }
396        // May be a false positive, check actual store
397        self.store.has(cid).await
398    }
399
400    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
401        // Check bloom filter first, only query store for maybes
402        let mut results = Vec::with_capacity(cids.len());
403        let mut to_check = Vec::new();
404        let mut indices = Vec::new();
405
406        for (i, cid) in cids.iter().enumerate() {
407            if self.filter.contains_cid(cid) {
408                to_check.push(*cid);
409                indices.push(i);
410            }
411            results.push(false); // Default to false
412        }
413
414        // Only query store for CIDs that passed bloom filter
415        if !to_check.is_empty() {
416            let store_results = self.store.has_many(&to_check).await?;
417            for (idx, exists) in indices.into_iter().zip(store_results) {
418                results[idx] = exists;
419            }
420        }
421
422        Ok(results)
423    }
424
425    async fn delete(&self, cid: &Cid) -> Result<()> {
426        // Note: We don't remove from bloom filter (standard bloom filters don't support deletion)
427        // The filter may have false positives for deleted items until rebuild
428        self.store.delete(cid).await
429    }
430
431    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
432        self.store.delete_many(cids).await
433    }
434
435    fn list_cids(&self) -> Result<Vec<Cid>> {
436        self.store.list_cids()
437    }
438
439    fn len(&self) -> usize {
440        self.store.len()
441    }
442
443    fn is_empty(&self) -> bool {
444        self.store.is_empty()
445    }
446
447    async fn flush(&self) -> Result<()> {
448        self.store.flush().await
449    }
450
451    async fn close(&self) -> Result<()> {
452        self.store.close().await
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn test_bloom_filter_basic() {
462        let filter = BloomFilter::new(1000, 0.01);
463
464        filter.insert(b"hello");
465        filter.insert(b"world");
466
467        assert!(filter.contains(b"hello"));
468        assert!(filter.contains(b"world"));
469        assert!(!filter.contains(b"foo")); // Might be false positive, but unlikely
470    }
471
472    #[test]
473    fn test_bloom_filter_false_positive_rate() {
474        let filter = BloomFilter::new(10000, 0.01);
475
476        // Insert 10000 items
477        for i in 0i32..10000 {
478            filter.insert(&i.to_le_bytes());
479        }
480
481        // Check false positives on items not inserted
482        let mut false_positives = 0;
483        for i in 10000i32..20000 {
484            if filter.contains(&i.to_le_bytes()) {
485                false_positives += 1;
486            }
487        }
488
489        // Should be around 1% false positives (allow some margin)
490        let fpr = false_positives as f64 / 10000.0;
491        assert!(fpr < 0.03, "False positive rate {} too high", fpr);
492    }
493
494    #[test]
495    fn test_bloom_config_memory() {
496        let config = BloomConfig::new(1_000_000, 0.01);
497        let memory_mb = config.memory_bytes() as f64 / (1024.0 * 1024.0);
498        // Should be less than 10MB for 1M items (verified target)
499        assert!(
500            memory_mb < 10.0,
501            "Memory {} MB exceeds 10MB target",
502            memory_mb
503        );
504    }
505
506    #[test]
507    fn test_bloom_filter_stats() {
508        let filter = BloomFilter::new(1000, 0.01);
509
510        for i in 0i32..100 {
511            filter.insert(&i.to_le_bytes());
512        }
513
514        let stats = filter.stats();
515        assert_eq!(stats.count, 100);
516        assert!(stats.fill_ratio > 0.0);
517        assert!(stats.fill_ratio < 1.0);
518    }
519}