sochdb_storage/
bloom.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bloom filter for fast negative lookups
16//!
17//! Space-efficient probabilistic data structure for existence tests.
18//! False positive rate: ~1% with optimal k hash functions.
19//!
20//! Uses Blake3-based double hashing for mathematically independent hash functions.
21
22use std::hash::{Hash, Hasher};
23
24#[derive(Debug)]
25pub struct BloomFilter {
26    bits: Vec<u64>,
27    num_bits: usize,
28    num_hashes: usize,
29}
30
31impl BloomFilter {
32    /// Create a new Bloom filter
33    ///
34    /// - expected_items: number of items to be inserted
35    /// - false_positive_rate: desired false positive rate (e.g., 0.01 for 1%)
36    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
37        let num_bits = Self::optimal_num_bits(expected_items, false_positive_rate);
38        let num_hashes = Self::optimal_num_hashes(expected_items, num_bits);
39
40        let num_words = num_bits.div_ceil(64);
41        let bits = vec![0u64; num_words];
42
43        Self {
44            bits,
45            num_bits,
46            num_hashes,
47        }
48    }
49
50    /// Calculate optimal number of bits
51    fn optimal_num_bits(n: usize, p: f64) -> usize {
52        let m = -(n as f64 * p.ln()) / (2.0_f64.ln().powi(2));
53        m.ceil() as usize
54    }
55
56    /// Calculate optimal number of hash functions
57    fn optimal_num_hashes(n: usize, m: usize) -> usize {
58        let k = (m as f64 / n as f64) * 2.0_f64.ln();
59        (k.ceil() as usize).max(1)
60    }
61
62    /// Insert an item into the bloom filter
63    pub fn insert<T: Hash>(&mut self, item: &T) {
64        for i in 0..self.num_hashes {
65            let hash = Self::hash(item, i);
66            let bit_index = hash % self.num_bits;
67            let word_index = bit_index / 64;
68            let bit_offset = bit_index % 64;
69
70            // PERFORMANCE: Remove runtime bounds check (proven safe by construction)
71            // The modulo operation ensures bit_index < num_bits
72            // The allocation ensures bits.len() == (num_bits + 63) / 64
73            // Therefore word_index = bit_index / 64 < bits.len()
74            debug_assert!(
75                word_index < self.bits.len(),
76                "Bloom filter index out of bounds: {} >= {}",
77                word_index,
78                self.bits.len()
79            );
80
81            self.bits[word_index] |= 1u64 << bit_offset;
82        }
83    }
84
85    /// Check if an item might be in the set
86    ///
87    /// Returns true if item *might* be present (could be false positive)
88    /// Returns false if item is *definitely not* present (100% accurate)
89    pub fn contains<T: Hash>(&self, item: &T) -> bool {
90        for i in 0..self.num_hashes {
91            let hash = Self::hash(item, i);
92            let bit_index = hash % self.num_bits;
93            let word_index = bit_index / 64;
94            let bit_offset = bit_index % 64;
95
96            // PERFORMANCE: Remove runtime bounds check (proven safe by construction)
97            debug_assert!(
98                word_index < self.bits.len(),
99                "Bloom filter index out of bounds: {} >= {}",
100                word_index,
101                self.bits.len()
102            );
103
104            if (self.bits[word_index] & (1u64 << bit_offset)) == 0 {
105                return false;
106            }
107        }
108        true
109    }
110
111    /// Compute h1 hash using Blake3
112    fn hash_h1<T: Hash>(item: &T) -> usize {
113        use std::collections::hash_map::DefaultHasher;
114
115        let mut hasher = blake3::Hasher::new();
116
117        // Hash the item using standard Hash trait first to get bytes
118        let mut std_hasher = DefaultHasher::new();
119        item.hash(&mut std_hasher);
120        let hash_value = std_hasher.finish();
121
122        hasher.update(&hash_value.to_le_bytes());
123        let hash = hasher.finalize();
124
125        // Take first 8 bytes as usize
126        let bytes = hash.as_bytes();
127        u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize
128    }
129
130    /// Compute h2 hash using Blake3 with domain separator
131    fn hash_h2<T: Hash>(item: &T) -> usize {
132        use std::collections::hash_map::DefaultHasher;
133
134        let mut hasher = blake3::Hasher::new();
135        hasher.update(b"BloomFilter::h2"); // Domain separator
136
137        let mut std_hasher = DefaultHasher::new();
138        item.hash(&mut std_hasher);
139        let hash_value = std_hasher.finish();
140
141        hasher.update(&hash_value.to_le_bytes());
142        let hash = hasher.finalize();
143
144        u64::from_le_bytes(hash.as_bytes()[0..8].try_into().unwrap()) as usize
145    }
146
147    /// Hash function using double hashing with Blake3
148    ///
149    /// Implements the double hashing scheme: h_i(x) = (h1(x) + i * h2(x)) mod m
150    /// where h1 and h2 are independent cryptographic hashes.
151    ///
152    /// This is proven equivalent to k independent hash functions in:
153    /// "Less Hashing, Same Performance: Building a Better Bloom Filter"
154    /// by Kirsch & Mitzenmacher (2008)
155    ///
156    /// Mathematical correctness: Using Blake3 as the base hash ensures:
157    /// 1. h1 and h2 are computationally independent
158    /// 2. Collision resistance (2^128 security for truncated output)
159    /// 3. Uniform distribution over output space
160    fn hash<T: Hash>(item: &T, index: usize) -> usize {
161        if index == 0 {
162            Self::hash_h1(item)
163        } else {
164            // Double hashing: h_i(x) = h1(x) + i * h2(x)
165            let h1 = Self::hash_h1(item);
166            let h2 = Self::hash_h2(item);
167
168            // Combine using double hashing formula
169            // Use wrapping_add to handle overflow gracefully
170            h1.wrapping_add(index.wrapping_mul(h2))
171        }
172    }
173
174    /// Serialize to bytes with BLAKE3 checksum for corruption detection
175    ///
176    /// **CRITICAL FIX**: Adds integrity checksum to detect bit flips, cosmic rays,
177    /// disk errors, or malicious modification of bloom filter data.
178    ///
179    /// Format v2 (with version header for forward compatibility):
180    /// - Magic number: 4 bytes ("BLM\x02" = 0x424C4D02)
181    /// - Header: num_bits (8) + num_hashes (8) + num_words (8) = 24 bytes
182    /// - Bit data: num_words * 8 bytes
183    /// - Checksum: 32 bytes (BLAKE3 hash of magic + header + data)
184    ///
185    /// Format v1 (legacy, for backward compatibility):
186    /// - Header: num_bits (8) + num_hashes (8) + num_words (8) = 24 bytes
187    /// - Bit data: num_words * 8 bytes
188    /// - Checksum: 32 bytes
189    pub fn to_bytes(&self) -> Vec<u8> {
190        use byteorder::{LittleEndian, WriteBytesExt};
191
192        let mut buf = Vec::new();
193
194        // Version 2 magic number: "BLM" + version byte (0x02)
195        // This allows future readers to detect the format and version
196        buf.extend_from_slice(&[0x42, 0x4C, 0x4D, 0x02]); // "BLM\x02"
197
198        buf.write_u64::<LittleEndian>(self.num_bits as u64).unwrap();
199        buf.write_u64::<LittleEndian>(self.num_hashes as u64)
200            .unwrap();
201        buf.write_u64::<LittleEndian>(self.bits.len() as u64)
202            .unwrap();
203
204        for &word in &self.bits {
205            buf.write_u64::<LittleEndian>(word).unwrap();
206        }
207
208        // CRITICAL FIX: Add BLAKE3 checksum for corruption detection
209        let checksum = blake3::hash(&buf);
210        buf.extend_from_slice(checksum.as_bytes());
211
212        buf
213    }
214
215    /// Magic number for bloom filter v2 format
216    const MAGIC_V2: [u8; 4] = [0x42, 0x4C, 0x4D, 0x02]; // "BLM\x02"
217
218    /// Deserialize from bytes with checksum validation and version detection
219    ///
220    /// **CRITICAL FIX**: Validates BLAKE3 checksum to detect bloom filter corruption.
221    /// If corrupted, returns an error. The caller can then rebuild from the index.
222    ///
223    /// **FORWARD COMPATIBILITY**: Detects format version via magic number.
224    /// - v2 (with magic "BLM\x02"): New format with version header
225    /// - v1 (no magic): Legacy format, auto-detected by size
226    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
227        use std::io::{Error, ErrorKind};
228
229        const CHECKSUM_SIZE: usize = 32; // BLAKE3 produces 32-byte hashes
230        const MAGIC_SIZE: usize = 4;
231        const HEADER_SIZE: usize = 24; // 3 * u64
232        const MIN_SIZE_V2: usize = MAGIC_SIZE + HEADER_SIZE + CHECKSUM_SIZE; // 60 bytes
233        const MIN_SIZE_V1: usize = HEADER_SIZE + CHECKSUM_SIZE; // 56 bytes
234
235        // Check for v2 format (has magic number)
236        if bytes.len() >= MIN_SIZE_V2 && bytes[..4] == Self::MAGIC_V2 {
237            return Self::from_bytes_v2(bytes);
238        }
239
240        // Fall back to v1 format (legacy, no magic number)
241        if bytes.len() >= MIN_SIZE_V1 {
242            return Self::from_bytes_v1(bytes);
243        }
244
245        Err(Error::new(
246            ErrorKind::InvalidData,
247            format!(
248                "Bloom filter data too small: {} bytes (minimum {})",
249                bytes.len(),
250                MIN_SIZE_V1
251            ),
252        ))
253    }
254
255    /// Parse v2 format (with magic number and version header)
256    fn from_bytes_v2(bytes: &[u8]) -> std::io::Result<Self> {
257        use std::io::{Cursor, Error, ErrorKind};
258
259        const CHECKSUM_SIZE: usize = 32;
260        const MAGIC_SIZE: usize = 4;
261
262        // Validate checksum BEFORE parsing data
263        let data_len = bytes.len() - CHECKSUM_SIZE;
264        let (data, stored_checksum) = bytes.split_at(data_len);
265
266        let computed_checksum = blake3::hash(data);
267        if computed_checksum.as_bytes() != stored_checksum {
268            return Err(Error::new(
269                ErrorKind::InvalidData,
270                format!(
271                    "Bloom filter v2 corruption detected: checksum mismatch. \
272                     Expected {:?}, got {:?}.",
273                    hex::encode(stored_checksum),
274                    hex::encode(computed_checksum.as_bytes())
275                ),
276            ));
277        }
278
279        // Skip magic number, parse header
280        let mut cursor = Cursor::new(&data[MAGIC_SIZE..]);
281        Self::parse_header_and_data(&mut cursor)
282    }
283
284    /// Parse v1 format (legacy, no magic number)
285    fn from_bytes_v1(bytes: &[u8]) -> std::io::Result<Self> {
286        use std::io::{Cursor, Error, ErrorKind};
287
288        const CHECKSUM_SIZE: usize = 32;
289
290        // Validate checksum BEFORE parsing data
291        let data_len = bytes.len() - CHECKSUM_SIZE;
292        let (data, stored_checksum) = bytes.split_at(data_len);
293
294        let computed_checksum = blake3::hash(data);
295        if computed_checksum.as_bytes() != stored_checksum {
296            return Err(Error::new(
297                ErrorKind::InvalidData,
298                format!(
299                    "Bloom filter v1 corruption detected: checksum mismatch. \
300                     Expected {:?}, got {:?}.",
301                    hex::encode(stored_checksum),
302                    hex::encode(computed_checksum.as_bytes())
303                ),
304            ));
305        }
306
307        let mut cursor = Cursor::new(data);
308        Self::parse_header_and_data(&mut cursor)
309    }
310
311    /// Common header and data parsing logic
312    fn parse_header_and_data(cursor: &mut std::io::Cursor<&[u8]>) -> std::io::Result<Self> {
313        use byteorder::{LittleEndian, ReadBytesExt};
314        use std::io::{Error, ErrorKind};
315
316        let num_bits = cursor.read_u64::<LittleEndian>()? as usize;
317        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
318        let num_words = cursor.read_u64::<LittleEndian>()? as usize;
319
320        // SECURITY: Validate parameters to prevent DOS/OOM attacks
321        const MAX_BITS: usize = 1_000_000_000; // 1 billion bits = ~119 MB
322        const MAX_HASHES: usize = 32; // Way more than needed (typical: 7)
323        const MAX_WORDS: usize = MAX_BITS / 64; // ~15.6 million words
324
325        if num_bits > MAX_BITS {
326            return Err(Error::new(
327                ErrorKind::InvalidData,
328                format!(
329                    "Bloom filter num_bits too large: {} > {}",
330                    num_bits, MAX_BITS
331                ),
332            ));
333        }
334
335        if num_hashes > MAX_HASHES {
336            return Err(Error::new(
337                ErrorKind::InvalidData,
338                format!(
339                    "Bloom filter num_hashes too large: {} > {}",
340                    num_hashes, MAX_HASHES
341                ),
342            ));
343        }
344
345        if num_words > MAX_WORDS {
346            return Err(Error::new(
347                ErrorKind::InvalidData,
348                format!(
349                    "Bloom filter num_words too large: {} > {}",
350                    num_words, MAX_WORDS
351                ),
352            ));
353        }
354
355        // CORRECTNESS: Verify num_words matches num_bits
356        let expected_words = num_bits.div_ceil(64);
357        if num_words != expected_words {
358            return Err(Error::new(
359                ErrorKind::InvalidData,
360                format!(
361                    "Bloom filter size mismatch: num_words={} but expected={} for num_bits={}",
362                    num_words, expected_words, num_bits
363                ),
364            ));
365        }
366
367        let mut bits = Vec::with_capacity(num_words);
368        for _ in 0..num_words {
369            bits.push(cursor.read_u64::<LittleEndian>()?);
370        }
371
372        Ok(Self {
373            bits,
374            num_bits,
375            num_hashes,
376        })
377    }
378
379    /// Deserialize from bytes without checksum validation (for backward compatibility)
380    ///
381    /// Use this for reading old bloom filters that don't have checksums.
382    /// New bloom filters should use `from_bytes()` which validates checksums.
383    pub fn from_bytes_legacy(bytes: &[u8]) -> std::io::Result<Self> {
384        use byteorder::{LittleEndian, ReadBytesExt};
385        use std::io::{Cursor, Error, ErrorKind};
386
387        let mut cursor = Cursor::new(bytes);
388
389        let num_bits = cursor.read_u64::<LittleEndian>()? as usize;
390        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
391        let num_words = cursor.read_u64::<LittleEndian>()? as usize;
392
393        // SECURITY: Validate parameters
394        const MAX_BITS: usize = 1_000_000_000;
395        const MAX_HASHES: usize = 32;
396        const MAX_WORDS: usize = MAX_BITS / 64;
397
398        if num_bits > MAX_BITS || num_hashes > MAX_HASHES || num_words > MAX_WORDS {
399            return Err(Error::new(
400                ErrorKind::InvalidData,
401                "Bloom filter parameters exceed limits",
402            ));
403        }
404
405        let expected_words = num_bits.div_ceil(64);
406        if num_words != expected_words {
407            return Err(Error::new(
408                ErrorKind::InvalidData,
409                "Bloom filter size mismatch",
410            ));
411        }
412
413        let mut bits = Vec::with_capacity(num_words);
414        for _ in 0..num_words {
415            bits.push(cursor.read_u64::<LittleEndian>()?);
416        }
417
418        Ok(Self {
419            bits,
420            num_bits,
421            num_hashes,
422        })
423    }
424
425    /// Get size in bytes
426    pub fn size_bytes(&self) -> usize {
427        24 + self.bits.len() * 8 // metadata + bit vector
428    }
429
430    /// Get memory size (alias for size_bytes)
431    pub fn memory_size(&self) -> usize {
432        self.size_bytes()
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_bloom_filter_basic() {
442        let mut bloom = BloomFilter::new(1000, 0.01);
443
444        // Insert items
445        for i in 0..100 {
446            bloom.insert(&i);
447        }
448
449        // Check inserted items (should all return true)
450        for i in 0..100 {
451            assert!(bloom.contains(&i));
452        }
453
454        // Check non-inserted items (most should return false)
455        let mut false_positives = 0;
456        for i in 100..1000 {
457            if bloom.contains(&i) {
458                false_positives += 1;
459            }
460        }
461
462        // False positive rate should be roughly 1% (allow up to 3% for variance)
463        let fp_rate = false_positives as f64 / 900.0;
464        assert!(fp_rate < 0.03, "False positive rate too high: {}", fp_rate);
465    }
466
467    #[test]
468    fn test_bloom_filter_serialization() {
469        let mut bloom = BloomFilter::new(100, 0.01);
470
471        for i in 0..50 {
472            bloom.insert(&i);
473        }
474
475        let bytes = bloom.to_bytes();
476        let restored = BloomFilter::from_bytes(&bytes).unwrap();
477
478        // Check all items are still present
479        for i in 0..50 {
480            assert!(restored.contains(&i));
481        }
482    }
483
484    #[test]
485    fn test_bloom_filter_strings() {
486        let mut bloom = BloomFilter::new(1000, 0.01);
487
488        let items = vec!["hello", "world", "foo", "bar", "baz"];
489        for item in &items {
490            bloom.insert(item);
491        }
492
493        for item in &items {
494            assert!(bloom.contains(item));
495        }
496
497        // Bloom filters can have false positives, so this might return true
498        let _ = bloom.contains(&"not_there");
499    }
500}
501
502// ============================================================================
503// jj.md Task 3: Blocked Bloom Filter with Cache-Line Alignment
504// ============================================================================
505
506/// Cache line size in bytes (64 bytes on most modern CPUs)
507const CACHE_LINE_SIZE: usize = 64;
508
509/// Bits per cache line block (512 bits = 64 bytes)
510const BLOCK_BITS: usize = CACHE_LINE_SIZE * 8;
511
512/// Level-adaptive false positive rates for LSM-tree
513/// L0 has the most reads, so we use lower FPR
514#[derive(Debug, Clone, Copy)]
515pub struct LevelAdaptiveFPR {
516    /// FPR for L0 (most frequently accessed)
517    pub l0: f64,
518    /// FPR for L1
519    pub l1: f64,
520    /// FPR for L2 and higher (less frequently accessed)
521    pub l2_plus: f64,
522}
523
524impl Default for LevelAdaptiveFPR {
525    fn default() -> Self {
526        Self {
527            l0: 0.001,     // 0.1% - Very low for L0
528            l1: 0.005,     // 0.5%
529            l2_plus: 0.01, // 1% - Standard for deeper levels
530        }
531    }
532}
533
534impl LevelAdaptiveFPR {
535    /// Get FPR for a given level
536    pub fn for_level(&self, level: usize) -> f64 {
537        match level {
538            0 => self.l0,
539            1 => self.l1,
540            _ => self.l2_plus,
541        }
542    }
543}
544
545// =============================================================================
546// Task 9 Enhancement: Bloom Filter Cascade
547// =============================================================================
548
549/// Cascaded bloom filters with decreasing false positive rates
550///
551/// ## Design
552/// Organizes multiple bloom filters in a cascade where:
553/// - First filter (L0): Smallest, catches most negatives quickly
554/// - Subsequent filters: Progressively lower FPR for items that pass
555///
556/// ## Benefits
557/// - Fast rejection of non-existent keys (first filter catches ~99%)
558/// - Lower overall FPR than single filter for same memory budget
559/// - Amortized O(1) lookup with early termination
560///
561/// ## Example Configuration
562/// ```text
563/// L0: 1% FPR, 100KB  → Catches 99% of negatives
564/// L1: 0.1% FPR, 50KB → Catches 99.9% of remaining
565/// L2: 0.01% FPR, 25KB → Final filter for edge cases
566/// Combined FPR: 0.01 * 0.001 * 0.0001 = 0.000000001 (1 in billion)
567/// ```
568#[derive(Debug)]
569pub struct BloomFilterCascade {
570    /// Cascade of filters (index 0 = first check)
571    filters: Vec<BlockedBloomFilter>,
572    /// FPR for each level
573    level_fpr: Vec<f64>,
574    /// Number of items expected
575    expected_items: usize,
576}
577
578impl BloomFilterCascade {
579    /// Create a new cascade with specified levels
580    ///
581    /// # Arguments
582    /// * `expected_items` - Number of items to store
583    /// * `level_fprs` - False positive rate for each level (e.g., [0.01, 0.001, 0.0001])
584    pub fn new(expected_items: usize, level_fprs: Vec<f64>) -> Self {
585        let filters = level_fprs
586            .iter()
587            .map(|&fpr| BlockedBloomFilter::new(expected_items, fpr))
588            .collect();
589
590        Self {
591            filters,
592            level_fpr: level_fprs,
593            expected_items,
594        }
595    }
596
597    /// Create default 3-level cascade
598    ///
599    /// - L0: 1% FPR (fast initial check)
600    /// - L1: 0.1% FPR (medium precision)
601    /// - L2: 0.01% FPR (high precision)
602    pub fn default_cascade(expected_items: usize) -> Self {
603        Self::new(expected_items, vec![0.01, 0.001, 0.0001])
604    }
605
606    /// Create memory-optimized 2-level cascade
607    pub fn compact(expected_items: usize) -> Self {
608        Self::new(expected_items, vec![0.01, 0.0001])
609    }
610
611    /// Insert an item into all cascade levels
612    pub fn insert<T: Hash>(&mut self, item: &T) {
613        for filter in &mut self.filters {
614            filter.insert(item);
615        }
616    }
617
618    /// Check if item might exist (with early termination)
619    ///
620    /// Returns false (definitely not present) as soon as any level returns false.
621    /// Returns true only if ALL levels return true.
622    pub fn contains<T: Hash>(&self, item: &T) -> bool {
623        for filter in &self.filters {
624            if !filter.contains(item) {
625                return false;
626            }
627        }
628        true
629    }
630
631    /// Check with level tracking (for debugging/stats)
632    pub fn contains_with_level<T: Hash>(&self, item: &T) -> (bool, usize) {
633        for (level, filter) in self.filters.iter().enumerate() {
634            if !filter.contains(item) {
635                return (false, level);
636            }
637        }
638        (true, self.filters.len())
639    }
640
641    /// Combined theoretical false positive rate
642    pub fn combined_fpr(&self) -> f64 {
643        self.level_fpr.iter().product()
644    }
645
646    /// Number of cascade levels
647    pub fn num_levels(&self) -> usize {
648        self.filters.len()
649    }
650
651    /// Memory usage in bytes
652    pub fn memory_usage(&self) -> usize {
653        self.filters.iter().map(|f| f.memory_usage()).sum()
654    }
655
656    /// Serialize to bytes
657    pub fn to_bytes(&self) -> Vec<u8> {
658        use byteorder::{LittleEndian, WriteBytesExt};
659
660        let mut buf = Vec::new();
661
662        // Magic: "BCF\x01" (Bloom Cascade Filter v1)
663        buf.extend_from_slice(&[0x42, 0x43, 0x46, 0x01]);
664
665        // Header
666        buf.write_u64::<LittleEndian>(self.expected_items as u64)
667            .unwrap();
668        buf.write_u64::<LittleEndian>(self.filters.len() as u64)
669            .unwrap();
670
671        // FPR for each level
672        for &fpr in &self.level_fpr {
673            buf.write_u64::<LittleEndian>(fpr.to_bits()).unwrap();
674        }
675
676        // Each filter's serialized data
677        for filter in &self.filters {
678            let filter_bytes = filter.to_bytes();
679            buf.write_u64::<LittleEndian>(filter_bytes.len() as u64)
680                .unwrap();
681            buf.extend_from_slice(&filter_bytes);
682        }
683
684        // Checksum
685        let checksum = blake3::hash(&buf);
686        buf.extend_from_slice(checksum.as_bytes());
687
688        buf
689    }
690
691    /// Deserialize from bytes
692    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
693        use byteorder::{LittleEndian, ReadBytesExt};
694        use std::io::{Cursor, Error, ErrorKind};
695
696        const CHECKSUM_SIZE: usize = 32;
697        const MIN_SIZE: usize = 4 + 16 + CHECKSUM_SIZE; // magic + header + checksum
698
699        if bytes.len() < MIN_SIZE {
700            return Err(Error::new(
701                ErrorKind::InvalidData,
702                "Bloom cascade data too small",
703            ));
704        }
705
706        // Verify magic
707        if bytes[..4] != [0x42, 0x43, 0x46, 0x01] {
708            return Err(Error::new(
709                ErrorKind::InvalidData,
710                "Invalid bloom cascade magic number",
711            ));
712        }
713
714        // Verify checksum
715        let data_len = bytes.len() - CHECKSUM_SIZE;
716        let (data, stored_checksum) = bytes.split_at(data_len);
717        let computed_checksum = blake3::hash(data);
718        if computed_checksum.as_bytes() != stored_checksum {
719            return Err(Error::new(
720                ErrorKind::InvalidData,
721                "Bloom cascade checksum mismatch",
722            ));
723        }
724
725        let mut cursor = Cursor::new(&bytes[4..]); // Skip magic
726
727        let expected_items = cursor.read_u64::<LittleEndian>()? as usize;
728        let num_levels = cursor.read_u64::<LittleEndian>()? as usize;
729
730        // Read FPRs
731        let mut level_fpr = Vec::with_capacity(num_levels);
732        for _ in 0..num_levels {
733            let bits = cursor.read_u64::<LittleEndian>()?;
734            level_fpr.push(f64::from_bits(bits));
735        }
736
737        // Read filters
738        let mut filters = Vec::with_capacity(num_levels);
739        for _ in 0..num_levels {
740            let filter_len = cursor.read_u64::<LittleEndian>()? as usize;
741            let pos = cursor.position() as usize;
742            let filter_bytes = &bytes[4 + pos..4 + pos + filter_len];
743            cursor.set_position((pos + filter_len) as u64);
744
745            let filter = BlockedBloomFilter::from_bytes(filter_bytes)?;
746            filters.push(filter);
747        }
748
749        Ok(Self {
750            filters,
751            level_fpr,
752            expected_items,
753        })
754    }
755}
756
757// ============================================================================
758// Unified Bloom Filter for SSTable Reader (supports both formats)
759// ============================================================================
760
761/// Magic numbers for format detection
762const BLOOM_MAGIC_V2: [u8; 4] = [0x42, 0x4C, 0x4D, 0x02]; // "BLM\x02" - Standard BloomFilter
763const BLOCKED_BLOOM_MAGIC: [u8; 4] = [0x42, 0x42, 0x46, 0x01]; // "BBF\x01" - BlockedBloomFilter
764
765/// Unified bloom filter that can be deserialized from either format.
766///
767/// This allows SSTableReader to handle both old SSTables (with BloomFilter)
768/// and new SSTables (with BlockedBloomFilter) transparently.
769#[derive(Debug)]
770pub enum UnifiedBloomFilter {
771    /// Standard bloom filter (legacy format)
772    Standard(BloomFilter),
773    /// Cache-line optimized blocked bloom filter (new format)
774    Blocked(BlockedBloomFilter),
775}
776
777impl UnifiedBloomFilter {
778    /// Check if an item might be in the set
779    pub fn contains<T: std::hash::Hash>(&self, item: &T) -> bool {
780        match self {
781            UnifiedBloomFilter::Standard(bf) => bf.contains(item),
782            UnifiedBloomFilter::Blocked(bbf) => bbf.contains(item),
783        }
784    }
785
786    /// Deserialize from bytes, auto-detecting the format
787    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
788        use std::io::{Error, ErrorKind};
789
790        if bytes.len() < 4 {
791            return Err(Error::new(
792                ErrorKind::InvalidData,
793                "Bloom filter data too small to detect format",
794            ));
795        }
796
797        // Check magic number to determine format
798        if bytes[..4] == BLOCKED_BLOOM_MAGIC {
799            // New blocked bloom filter format
800            BlockedBloomFilter::from_bytes(bytes).map(UnifiedBloomFilter::Blocked)
801        } else if bytes[..4] == BLOOM_MAGIC_V2 || bytes.len() >= 56 {
802            // Standard bloom filter (v2 with magic or v1 legacy)
803            BloomFilter::from_bytes(bytes).map(UnifiedBloomFilter::Standard)
804        } else {
805            Err(Error::new(
806                ErrorKind::InvalidData,
807                "Unknown bloom filter format",
808            ))
809        }
810    }
811
812    /// Get memory size in bytes
813    pub fn memory_size(&self) -> usize {
814        match self {
815            UnifiedBloomFilter::Standard(bf) => bf.memory_size(),
816            UnifiedBloomFilter::Blocked(bbf) => bbf.memory_usage(),
817        }
818    }
819}
820
821/// Cache-line aligned blocked bloom filter for improved cache efficiency.
822///
823/// ## jj.md Task 3: Blocked Bloom Filter
824///
825/// This implementation groups all hash probes for a key into a single
826/// cache-line sized block, reducing cache misses from k (number of hashes)
827/// to 1.
828///
829/// ### Performance Benefits
830///
831/// - **Cache efficiency**: All probes hit the same cache line
832/// - **Reduced memory bandwidth**: One cache line load vs k scattered loads
833/// - **Better TLB usage**: Single page access per lookup
834///
835/// ### Trade-offs
836///
837/// - Slightly higher FPR than standard bloom filter at same memory usage
838/// - Block selection uses one hash, remaining hashes probe within block
839///
840/// ### Reference
841///
842/// "Cache-Efficient Bloom Filters" (Putze et al., 2007)
843#[derive(Debug)]
844pub struct BlockedBloomFilter {
845    /// Cache-line aligned blocks (each block is 64 bytes = 512 bits)
846    blocks: Vec<[u64; 8]>,
847    /// Number of blocks
848    num_blocks: usize,
849    /// Number of hash probes within each block
850    num_hashes: usize,
851    /// Expected items (for sizing)
852    expected_items: usize,
853}
854
855impl BlockedBloomFilter {
856    /// Create a new blocked bloom filter.
857    ///
858    /// # Arguments
859    /// * `expected_items` - Number of items expected to be inserted
860    /// * `false_positive_rate` - Desired false positive rate (e.g., 0.01 for 1%)
861    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
862        // Calculate bits per key needed
863        // m/n = -ln(p) / ln(2)² ≈ -1.44 * ln(p)
864        let bits_per_key = (-1.44 * false_positive_rate.ln()).ceil() as usize;
865        let bits_per_key = bits_per_key.max(4); // Minimum 4 bits per key
866
867        let total_bits = expected_items * bits_per_key;
868        let num_blocks = total_bits.div_ceil(BLOCK_BITS);
869        let num_blocks = num_blocks.max(1);
870
871        // Optimal number of hashes: k = ln(2) * (m/n) ≈ 0.693 * bits_per_key
872        let num_hashes = ((0.693 * bits_per_key as f64).ceil() as usize).clamp(1, 8);
873
874        let blocks = vec![[0u64; 8]; num_blocks];
875
876        Self {
877            blocks,
878            num_blocks,
879            num_hashes,
880            expected_items,
881        }
882    }
883
884    /// Create a blocked bloom filter with level-adaptive FPR.
885    pub fn for_level(expected_items: usize, level: usize) -> Self {
886        let fpr = LevelAdaptiveFPR::default().for_level(level);
887        Self::new(expected_items, fpr)
888    }
889
890    /// Insert an item into the bloom filter.
891    pub fn insert<T: Hash>(&mut self, item: &T) {
892        let (block_idx, h2) = self.compute_hashes(item);
893
894        // Probe within the selected block
895        let block = &mut self.blocks[block_idx];
896        for i in 0..self.num_hashes {
897            let bit_pos = (h2.wrapping_add(i * 31)) % BLOCK_BITS;
898            let word_idx = bit_pos / 64;
899            let bit_offset = bit_pos % 64;
900            block[word_idx] |= 1u64 << bit_offset;
901        }
902    }
903
904    /// Check if an item might be in the set.
905    ///
906    /// Returns `true` if item *might* be present (could be false positive).
907    /// Returns `false` if item is *definitely not* present (100% accurate).
908    pub fn contains<T: Hash>(&self, item: &T) -> bool {
909        let (block_idx, h2) = self.compute_hashes(item);
910        let block = &self.blocks[block_idx];
911
912        for i in 0..self.num_hashes {
913            let bit_pos = (h2.wrapping_add(i * 31)) % BLOCK_BITS;
914            let word_idx = bit_pos / 64;
915            let bit_offset = bit_pos % 64;
916            if (block[word_idx] & (1u64 << bit_offset)) == 0 {
917                return false;
918            }
919        }
920        true
921    }
922
923    /// Compute block index and intra-block hash.
924    fn compute_hashes<T: Hash>(&self, item: &T) -> (usize, usize) {
925        use std::collections::hash_map::DefaultHasher;
926
927        let mut hasher = DefaultHasher::new();
928        item.hash(&mut hasher);
929        let h1 = hasher.finish();
930
931        // Use upper and lower bits for two independent-ish hashes
932        let block_idx = (h1 as usize) % self.num_blocks;
933        let h2 = ((h1 >> 32) as usize) | ((h1 as usize) << 32);
934
935        (block_idx, h2)
936    }
937
938    /// Serialize to bytes with checksum.
939    pub fn to_bytes(&self) -> Vec<u8> {
940        use byteorder::{LittleEndian, WriteBytesExt};
941
942        let mut buf = Vec::new();
943
944        // Magic number for blocked bloom filter: "BBF\x01"
945        buf.extend_from_slice(&[0x42, 0x42, 0x46, 0x01]);
946
947        // Header
948        buf.write_u64::<LittleEndian>(self.num_blocks as u64)
949            .unwrap();
950        buf.write_u64::<LittleEndian>(self.num_hashes as u64)
951            .unwrap();
952        buf.write_u64::<LittleEndian>(self.expected_items as u64)
953            .unwrap();
954
955        // Block data
956        for block in &self.blocks {
957            for &word in block {
958                buf.write_u64::<LittleEndian>(word).unwrap();
959            }
960        }
961
962        // BLAKE3 checksum
963        let checksum = blake3::hash(&buf);
964        buf.extend_from_slice(checksum.as_bytes());
965
966        buf
967    }
968
969    /// Deserialize from bytes with checksum validation.
970    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
971        use byteorder::{LittleEndian, ReadBytesExt};
972        use std::io::{Cursor, Error, ErrorKind};
973
974        const MAGIC: [u8; 4] = [0x42, 0x42, 0x46, 0x01]; // "BBF\x01"
975        const CHECKSUM_SIZE: usize = 32;
976        const HEADER_SIZE: usize = 4 + 24; // magic + 3 * u64
977
978        if bytes.len() < HEADER_SIZE + CHECKSUM_SIZE {
979            return Err(Error::new(
980                ErrorKind::InvalidData,
981                "Blocked bloom filter data too small",
982            ));
983        }
984
985        // Verify magic
986        if bytes[..4] != MAGIC {
987            return Err(Error::new(
988                ErrorKind::InvalidData,
989                "Invalid blocked bloom filter magic number",
990            ));
991        }
992
993        // Verify checksum
994        let data_len = bytes.len() - CHECKSUM_SIZE;
995        let (data, stored_checksum) = bytes.split_at(data_len);
996        let computed_checksum = blake3::hash(data);
997
998        if computed_checksum.as_bytes() != stored_checksum {
999            return Err(Error::new(
1000                ErrorKind::InvalidData,
1001                "Blocked bloom filter checksum mismatch",
1002            ));
1003        }
1004
1005        // Parse header
1006        let mut cursor = Cursor::new(&bytes[4..]);
1007        let num_blocks = cursor.read_u64::<LittleEndian>()? as usize;
1008        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
1009        let expected_items = cursor.read_u64::<LittleEndian>()? as usize;
1010
1011        // Validate sizes
1012        let expected_data_size = HEADER_SIZE + num_blocks * 64 + CHECKSUM_SIZE;
1013        if bytes.len() != expected_data_size {
1014            return Err(Error::new(
1015                ErrorKind::InvalidData,
1016                format!(
1017                    "Blocked bloom filter size mismatch: {} vs {}",
1018                    bytes.len(),
1019                    expected_data_size
1020                ),
1021            ));
1022        }
1023
1024        // Parse blocks
1025        let mut blocks = Vec::with_capacity(num_blocks);
1026        for _ in 0..num_blocks {
1027            let mut block = [0u64; 8];
1028            for word in &mut block {
1029                *word = cursor.read_u64::<LittleEndian>()?;
1030            }
1031            blocks.push(block);
1032        }
1033
1034        Ok(Self {
1035            blocks,
1036            num_blocks,
1037            num_hashes,
1038            expected_items,
1039        })
1040    }
1041
1042    /// Get size in bytes.
1043    pub fn size_bytes(&self) -> usize {
1044        4 + 24 + self.num_blocks * 64 + 32 // magic + header + blocks + checksum
1045    }
1046
1047    /// Get memory usage in bytes.
1048    pub fn memory_usage(&self) -> usize {
1049        std::mem::size_of::<Self>() + self.num_blocks * 64
1050    }
1051
1052    /// Get fill ratio (fraction of bits set).
1053    pub fn fill_ratio(&self) -> f64 {
1054        let set_bits: usize = self
1055            .blocks
1056            .iter()
1057            .flat_map(|b| b.iter())
1058            .map(|w| w.count_ones() as usize)
1059            .sum();
1060
1061        let total_bits = self.num_blocks * BLOCK_BITS;
1062        set_bits as f64 / total_bits as f64
1063    }
1064}
1065
1066#[cfg(test)]
1067mod blocked_bloom_tests {
1068    use super::*;
1069
1070    #[test]
1071    fn test_blocked_bloom_basic() {
1072        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1073
1074        // Insert items
1075        for i in 0..100 {
1076            bloom.insert(&i);
1077        }
1078
1079        // Check inserted items (should all return true)
1080        for i in 0..100 {
1081            assert!(bloom.contains(&i), "Missing inserted item {}", i);
1082        }
1083
1084        // Check non-inserted items (most should return false)
1085        let mut false_positives = 0;
1086        for i in 100..1000 {
1087            if bloom.contains(&i) {
1088                false_positives += 1;
1089            }
1090        }
1091
1092        // Blocked bloom filters have slightly higher FPR, allow up to 5%
1093        let fp_rate = false_positives as f64 / 900.0;
1094        assert!(fp_rate < 0.05, "False positive rate too high: {}", fp_rate);
1095    }
1096
1097    #[test]
1098    fn test_blocked_bloom_serialization() {
1099        let mut bloom = BlockedBloomFilter::new(100, 0.01);
1100
1101        for i in 0..50 {
1102            bloom.insert(&i);
1103        }
1104
1105        let bytes = bloom.to_bytes();
1106        let restored = BlockedBloomFilter::from_bytes(&bytes).unwrap();
1107
1108        // Check all items are still present
1109        for i in 0..50 {
1110            assert!(
1111                restored.contains(&i),
1112                "Missing item {} after deserialize",
1113                i
1114            );
1115        }
1116    }
1117
1118    #[test]
1119    fn test_blocked_bloom_level_adaptive() {
1120        // L0 should have lower FPR (more bits per key)
1121        let l0_bloom = BlockedBloomFilter::for_level(1000, 0);
1122        let l2_bloom = BlockedBloomFilter::for_level(1000, 2);
1123
1124        // L0 should use more memory (lower FPR = more bits)
1125        assert!(
1126            l0_bloom.memory_usage() >= l2_bloom.memory_usage(),
1127            "L0 bloom should use >= memory than L2"
1128        );
1129    }
1130
1131    #[test]
1132    fn test_blocked_bloom_u128_keys() {
1133        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1134
1135        // Insert u128 edge IDs
1136        let edge_ids: Vec<u128> = (0..100).map(|i| i as u128 * 12345678901234567890).collect();
1137
1138        for id in &edge_ids {
1139            bloom.insert(id);
1140        }
1141
1142        // All inserted items should be found
1143        for id in &edge_ids {
1144            assert!(bloom.contains(id));
1145        }
1146    }
1147
1148    #[test]
1149    fn test_blocked_bloom_checksum_corruption() {
1150        let mut bloom = BlockedBloomFilter::new(100, 0.01);
1151        bloom.insert(&42);
1152
1153        let mut bytes = bloom.to_bytes();
1154
1155        // Corrupt a byte in the data
1156        if bytes.len() > 10 {
1157            bytes[10] ^= 0xFF;
1158        }
1159
1160        // Should fail checksum validation
1161        assert!(BlockedBloomFilter::from_bytes(&bytes).is_err());
1162    }
1163
1164    #[test]
1165    fn test_blocked_bloom_fill_ratio() {
1166        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1167
1168        assert_eq!(bloom.fill_ratio(), 0.0);
1169
1170        for i in 0..500 {
1171            bloom.insert(&i);
1172        }
1173
1174        let ratio = bloom.fill_ratio();
1175        assert!(ratio > 0.0 && ratio < 1.0);
1176    }
1177
1178    #[test]
1179    fn test_level_adaptive_fpr() {
1180        let fpr = LevelAdaptiveFPR::default();
1181
1182        assert_eq!(fpr.for_level(0), 0.001);
1183        assert_eq!(fpr.for_level(1), 0.005);
1184        assert_eq!(fpr.for_level(2), 0.01);
1185        assert_eq!(fpr.for_level(5), 0.01);
1186    }
1187
1188    // Bloom Filter Cascade Tests
1189
1190    #[test]
1191    fn test_cascade_basic() {
1192        let mut cascade = BloomFilterCascade::default_cascade(1000);
1193
1194        // Insert items
1195        for i in 0..100 {
1196            cascade.insert(&i);
1197        }
1198
1199        // All inserted items should be found
1200        for i in 0..100 {
1201            assert!(cascade.contains(&i), "Item {} should be found", i);
1202        }
1203
1204        // Verify cascade has expected structure
1205        assert_eq!(cascade.num_levels(), 3);
1206        assert!(cascade.combined_fpr() < 0.0001);
1207    }
1208
1209    #[test]
1210    fn test_cascade_early_termination() {
1211        let mut cascade = BloomFilterCascade::new(1000, vec![0.5, 0.5, 0.5]); // High FPR for testing
1212
1213        cascade.insert(&42);
1214
1215        // Inserted item passes all levels
1216        let (found, level) = cascade.contains_with_level(&42);
1217        assert!(found);
1218        assert_eq!(level, 3); // Passed all 3 levels
1219
1220        // Non-inserted item should fail at some level
1221        let (found, level) = cascade.contains_with_level(&999999);
1222        if !found {
1223            assert!(level < 3, "Should fail before reaching all levels");
1224        }
1225    }
1226
1227    #[test]
1228    fn test_cascade_serialization() {
1229        let mut cascade = BloomFilterCascade::default_cascade(500);
1230
1231        for i in 0..100 {
1232            cascade.insert(&i);
1233        }
1234
1235        // Serialize and deserialize
1236        let bytes = cascade.to_bytes();
1237        let restored = BloomFilterCascade::from_bytes(&bytes).unwrap();
1238
1239        // Check properties preserved
1240        assert_eq!(restored.num_levels(), cascade.num_levels());
1241        assert_eq!(restored.expected_items, cascade.expected_items);
1242
1243        // Check contents
1244        for i in 0..100 {
1245            assert!(restored.contains(&i));
1246        }
1247    }
1248
1249    #[test]
1250    fn test_cascade_memory() {
1251        let cascade = BloomFilterCascade::default_cascade(10000);
1252
1253        // Should use reasonable memory
1254        let mem = cascade.memory_usage();
1255        assert!(mem > 0);
1256
1257        // Combined FPR should be very low
1258        let combined_fpr = cascade.combined_fpr();
1259        assert!(
1260            combined_fpr < 0.000001,
1261            "Combined FPR should be < 1 in million"
1262        );
1263    }
1264
1265    #[test]
1266    fn test_cascade_compact() {
1267        let compact = BloomFilterCascade::compact(1000);
1268        let full = BloomFilterCascade::default_cascade(1000);
1269
1270        // Compact should use fewer levels
1271        assert_eq!(compact.num_levels(), 2);
1272        assert_eq!(full.num_levels(), 3);
1273
1274        // But still maintain low FPR
1275        assert!(compact.combined_fpr() < 0.0001);
1276    }
1277}