Skip to main content

sochdb_storage/
bloom.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Bloom filter for fast negative lookups
19//!
20//! Space-efficient probabilistic data structure for existence tests.
21//! False positive rate: ~1% with optimal k hash functions.
22//!
23//! Uses Blake3-based double hashing for mathematically independent hash functions.
24
25use std::hash::{Hash, Hasher};
26
27#[derive(Debug)]
28pub struct BloomFilter {
29    bits: Vec<u64>,
30    num_bits: usize,
31    num_hashes: usize,
32}
33
34impl BloomFilter {
35    /// Create a new Bloom filter
36    ///
37    /// - expected_items: number of items to be inserted
38    /// - false_positive_rate: desired false positive rate (e.g., 0.01 for 1%)
39    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
40        let num_bits = Self::optimal_num_bits(expected_items, false_positive_rate);
41        let num_hashes = Self::optimal_num_hashes(expected_items, num_bits);
42
43        let num_words = num_bits.div_ceil(64);
44        let bits = vec![0u64; num_words];
45
46        Self {
47            bits,
48            num_bits,
49            num_hashes,
50        }
51    }
52
53    /// Calculate optimal number of bits
54    fn optimal_num_bits(n: usize, p: f64) -> usize {
55        let m = -(n as f64 * p.ln()) / (2.0_f64.ln().powi(2));
56        m.ceil() as usize
57    }
58
59    /// Calculate optimal number of hash functions
60    fn optimal_num_hashes(n: usize, m: usize) -> usize {
61        let k = (m as f64 / n as f64) * 2.0_f64.ln();
62        (k.ceil() as usize).max(1)
63    }
64
65    /// Insert an item into the bloom filter
66    pub fn insert<T: Hash>(&mut self, item: &T) {
67        for i in 0..self.num_hashes {
68            let hash = Self::hash(item, i);
69            let bit_index = hash % self.num_bits;
70            let word_index = bit_index / 64;
71            let bit_offset = bit_index % 64;
72
73            // PERFORMANCE: Remove runtime bounds check (proven safe by construction)
74            // The modulo operation ensures bit_index < num_bits
75            // The allocation ensures bits.len() == (num_bits + 63) / 64
76            // Therefore word_index = bit_index / 64 < bits.len()
77            debug_assert!(
78                word_index < self.bits.len(),
79                "Bloom filter index out of bounds: {} >= {}",
80                word_index,
81                self.bits.len()
82            );
83
84            self.bits[word_index] |= 1u64 << bit_offset;
85        }
86    }
87
88    /// Check if an item might be in the set
89    ///
90    /// Returns true if item *might* be present (could be false positive)
91    /// Returns false if item is *definitely not* present (100% accurate)
92    pub fn contains<T: Hash>(&self, item: &T) -> bool {
93        for i in 0..self.num_hashes {
94            let hash = Self::hash(item, i);
95            let bit_index = hash % self.num_bits;
96            let word_index = bit_index / 64;
97            let bit_offset = bit_index % 64;
98
99            // PERFORMANCE: Remove runtime bounds check (proven safe by construction)
100            debug_assert!(
101                word_index < self.bits.len(),
102                "Bloom filter index out of bounds: {} >= {}",
103                word_index,
104                self.bits.len()
105            );
106
107            if (self.bits[word_index] & (1u64 << bit_offset)) == 0 {
108                return false;
109            }
110        }
111        true
112    }
113
114    /// Compute h1 hash using Blake3
115    fn hash_h1<T: Hash>(item: &T) -> usize {
116        use std::collections::hash_map::DefaultHasher;
117
118        let mut hasher = blake3::Hasher::new();
119
120        // Hash the item using standard Hash trait first to get bytes
121        let mut std_hasher = DefaultHasher::new();
122        item.hash(&mut std_hasher);
123        let hash_value = std_hasher.finish();
124
125        hasher.update(&hash_value.to_le_bytes());
126        let hash = hasher.finalize();
127
128        // Take first 8 bytes as usize
129        let bytes = hash.as_bytes();
130        u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize
131    }
132
133    /// Compute h2 hash using Blake3 with domain separator
134    fn hash_h2<T: Hash>(item: &T) -> usize {
135        use std::collections::hash_map::DefaultHasher;
136
137        let mut hasher = blake3::Hasher::new();
138        hasher.update(b"BloomFilter::h2"); // Domain separator
139
140        let mut std_hasher = DefaultHasher::new();
141        item.hash(&mut std_hasher);
142        let hash_value = std_hasher.finish();
143
144        hasher.update(&hash_value.to_le_bytes());
145        let hash = hasher.finalize();
146
147        u64::from_le_bytes(hash.as_bytes()[0..8].try_into().unwrap()) as usize
148    }
149
150    /// Hash function using double hashing with Blake3
151    ///
152    /// Implements the double hashing scheme: h_i(x) = (h1(x) + i * h2(x)) mod m
153    /// where h1 and h2 are independent cryptographic hashes.
154    ///
155    /// This is proven equivalent to k independent hash functions in:
156    /// "Less Hashing, Same Performance: Building a Better Bloom Filter"
157    /// by Kirsch & Mitzenmacher (2008)
158    ///
159    /// Mathematical correctness: Using Blake3 as the base hash ensures:
160    /// 1. h1 and h2 are computationally independent
161    /// 2. Collision resistance (2^128 security for truncated output)
162    /// 3. Uniform distribution over output space
163    fn hash<T: Hash>(item: &T, index: usize) -> usize {
164        if index == 0 {
165            Self::hash_h1(item)
166        } else {
167            // Double hashing: h_i(x) = h1(x) + i * h2(x)
168            let h1 = Self::hash_h1(item);
169            let h2 = Self::hash_h2(item);
170
171            // Combine using double hashing formula
172            // Use wrapping_add to handle overflow gracefully
173            h1.wrapping_add(index.wrapping_mul(h2))
174        }
175    }
176
177    /// Serialize to bytes with BLAKE3 checksum for corruption detection
178    ///
179    /// **CRITICAL FIX**: Adds integrity checksum to detect bit flips, cosmic rays,
180    /// disk errors, or malicious modification of bloom filter data.
181    ///
182    /// Format v2 (with version header for forward compatibility):
183    /// - Magic number: 4 bytes ("BLM\x02" = 0x424C4D02)
184    /// - Header: num_bits (8) + num_hashes (8) + num_words (8) = 24 bytes
185    /// - Bit data: num_words * 8 bytes
186    /// - Checksum: 32 bytes (BLAKE3 hash of magic + header + data)
187    ///
188    /// Format v1 (legacy, for backward compatibility):
189    /// - Header: num_bits (8) + num_hashes (8) + num_words (8) = 24 bytes
190    /// - Bit data: num_words * 8 bytes
191    /// - Checksum: 32 bytes
192    pub fn to_bytes(&self) -> Vec<u8> {
193        use byteorder::{LittleEndian, WriteBytesExt};
194
195        let mut buf = Vec::new();
196
197        // Version 2 magic number: "BLM" + version byte (0x02)
198        // This allows future readers to detect the format and version
199        buf.extend_from_slice(&[0x42, 0x4C, 0x4D, 0x02]); // "BLM\x02"
200
201        buf.write_u64::<LittleEndian>(self.num_bits as u64).unwrap();
202        buf.write_u64::<LittleEndian>(self.num_hashes as u64)
203            .unwrap();
204        buf.write_u64::<LittleEndian>(self.bits.len() as u64)
205            .unwrap();
206
207        for &word in &self.bits {
208            buf.write_u64::<LittleEndian>(word).unwrap();
209        }
210
211        // CRITICAL FIX: Add BLAKE3 checksum for corruption detection
212        let checksum = blake3::hash(&buf);
213        buf.extend_from_slice(checksum.as_bytes());
214
215        buf
216    }
217
218    /// Magic number for bloom filter v2 format
219    const MAGIC_V2: [u8; 4] = [0x42, 0x4C, 0x4D, 0x02]; // "BLM\x02"
220
221    /// Deserialize from bytes with checksum validation and version detection
222    ///
223    /// **CRITICAL FIX**: Validates BLAKE3 checksum to detect bloom filter corruption.
224    /// If corrupted, returns an error. The caller can then rebuild from the index.
225    ///
226    /// **FORWARD COMPATIBILITY**: Detects format version via magic number.
227    /// - v2 (with magic "BLM\x02"): New format with version header
228    /// - v1 (no magic): Legacy format, auto-detected by size
229    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
230        use std::io::{Error, ErrorKind};
231
232        const CHECKSUM_SIZE: usize = 32; // BLAKE3 produces 32-byte hashes
233        const MAGIC_SIZE: usize = 4;
234        const HEADER_SIZE: usize = 24; // 3 * u64
235        const MIN_SIZE_V2: usize = MAGIC_SIZE + HEADER_SIZE + CHECKSUM_SIZE; // 60 bytes
236        const MIN_SIZE_V1: usize = HEADER_SIZE + CHECKSUM_SIZE; // 56 bytes
237
238        // Check for v2 format (has magic number)
239        if bytes.len() >= MIN_SIZE_V2 && bytes[..4] == Self::MAGIC_V2 {
240            return Self::from_bytes_v2(bytes);
241        }
242
243        // Fall back to v1 format (legacy, no magic number)
244        if bytes.len() >= MIN_SIZE_V1 {
245            return Self::from_bytes_v1(bytes);
246        }
247
248        Err(Error::new(
249            ErrorKind::InvalidData,
250            format!(
251                "Bloom filter data too small: {} bytes (minimum {})",
252                bytes.len(),
253                MIN_SIZE_V1
254            ),
255        ))
256    }
257
258    /// Parse v2 format (with magic number and version header)
259    fn from_bytes_v2(bytes: &[u8]) -> std::io::Result<Self> {
260        use std::io::{Cursor, Error, ErrorKind};
261
262        const CHECKSUM_SIZE: usize = 32;
263        const MAGIC_SIZE: usize = 4;
264
265        // Validate checksum BEFORE parsing data
266        let data_len = bytes.len() - CHECKSUM_SIZE;
267        let (data, stored_checksum) = bytes.split_at(data_len);
268
269        let computed_checksum = blake3::hash(data);
270        if computed_checksum.as_bytes() != stored_checksum {
271            return Err(Error::new(
272                ErrorKind::InvalidData,
273                format!(
274                    "Bloom filter v2 corruption detected: checksum mismatch. \
275                     Expected {:?}, got {:?}.",
276                    hex::encode(stored_checksum),
277                    hex::encode(computed_checksum.as_bytes())
278                ),
279            ));
280        }
281
282        // Skip magic number, parse header
283        let mut cursor = Cursor::new(&data[MAGIC_SIZE..]);
284        Self::parse_header_and_data(&mut cursor)
285    }
286
287    /// Parse v1 format (legacy, no magic number)
288    fn from_bytes_v1(bytes: &[u8]) -> std::io::Result<Self> {
289        use std::io::{Cursor, Error, ErrorKind};
290
291        const CHECKSUM_SIZE: usize = 32;
292
293        // Validate checksum BEFORE parsing data
294        let data_len = bytes.len() - CHECKSUM_SIZE;
295        let (data, stored_checksum) = bytes.split_at(data_len);
296
297        let computed_checksum = blake3::hash(data);
298        if computed_checksum.as_bytes() != stored_checksum {
299            return Err(Error::new(
300                ErrorKind::InvalidData,
301                format!(
302                    "Bloom filter v1 corruption detected: checksum mismatch. \
303                     Expected {:?}, got {:?}.",
304                    hex::encode(stored_checksum),
305                    hex::encode(computed_checksum.as_bytes())
306                ),
307            ));
308        }
309
310        let mut cursor = Cursor::new(data);
311        Self::parse_header_and_data(&mut cursor)
312    }
313
314    /// Common header and data parsing logic
315    fn parse_header_and_data(cursor: &mut std::io::Cursor<&[u8]>) -> std::io::Result<Self> {
316        use byteorder::{LittleEndian, ReadBytesExt};
317        use std::io::{Error, ErrorKind};
318
319        let num_bits = cursor.read_u64::<LittleEndian>()? as usize;
320        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
321        let num_words = cursor.read_u64::<LittleEndian>()? as usize;
322
323        // SECURITY: Validate parameters to prevent DOS/OOM attacks
324        const MAX_BITS: usize = 1_000_000_000; // 1 billion bits = ~119 MB
325        const MAX_HASHES: usize = 32; // Way more than needed (typical: 7)
326        const MAX_WORDS: usize = MAX_BITS / 64; // ~15.6 million words
327
328        if num_bits > MAX_BITS {
329            return Err(Error::new(
330                ErrorKind::InvalidData,
331                format!(
332                    "Bloom filter num_bits too large: {} > {}",
333                    num_bits, MAX_BITS
334                ),
335            ));
336        }
337
338        if num_hashes > MAX_HASHES {
339            return Err(Error::new(
340                ErrorKind::InvalidData,
341                format!(
342                    "Bloom filter num_hashes too large: {} > {}",
343                    num_hashes, MAX_HASHES
344                ),
345            ));
346        }
347
348        if num_words > MAX_WORDS {
349            return Err(Error::new(
350                ErrorKind::InvalidData,
351                format!(
352                    "Bloom filter num_words too large: {} > {}",
353                    num_words, MAX_WORDS
354                ),
355            ));
356        }
357
358        // CORRECTNESS: Verify num_words matches num_bits
359        let expected_words = num_bits.div_ceil(64);
360        if num_words != expected_words {
361            return Err(Error::new(
362                ErrorKind::InvalidData,
363                format!(
364                    "Bloom filter size mismatch: num_words={} but expected={} for num_bits={}",
365                    num_words, expected_words, num_bits
366                ),
367            ));
368        }
369
370        let mut bits = Vec::with_capacity(num_words);
371        for _ in 0..num_words {
372            bits.push(cursor.read_u64::<LittleEndian>()?);
373        }
374
375        Ok(Self {
376            bits,
377            num_bits,
378            num_hashes,
379        })
380    }
381
382    /// Deserialize from bytes without checksum validation (for backward compatibility)
383    ///
384    /// Use this for reading old bloom filters that don't have checksums.
385    /// New bloom filters should use `from_bytes()` which validates checksums.
386    pub fn from_bytes_legacy(bytes: &[u8]) -> std::io::Result<Self> {
387        use byteorder::{LittleEndian, ReadBytesExt};
388        use std::io::{Cursor, Error, ErrorKind};
389
390        let mut cursor = Cursor::new(bytes);
391
392        let num_bits = cursor.read_u64::<LittleEndian>()? as usize;
393        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
394        let num_words = cursor.read_u64::<LittleEndian>()? as usize;
395
396        // SECURITY: Validate parameters
397        const MAX_BITS: usize = 1_000_000_000;
398        const MAX_HASHES: usize = 32;
399        const MAX_WORDS: usize = MAX_BITS / 64;
400
401        if num_bits > MAX_BITS || num_hashes > MAX_HASHES || num_words > MAX_WORDS {
402            return Err(Error::new(
403                ErrorKind::InvalidData,
404                "Bloom filter parameters exceed limits",
405            ));
406        }
407
408        let expected_words = num_bits.div_ceil(64);
409        if num_words != expected_words {
410            return Err(Error::new(
411                ErrorKind::InvalidData,
412                "Bloom filter size mismatch",
413            ));
414        }
415
416        let mut bits = Vec::with_capacity(num_words);
417        for _ in 0..num_words {
418            bits.push(cursor.read_u64::<LittleEndian>()?);
419        }
420
421        Ok(Self {
422            bits,
423            num_bits,
424            num_hashes,
425        })
426    }
427
428    /// Get size in bytes
429    pub fn size_bytes(&self) -> usize {
430        24 + self.bits.len() * 8 // metadata + bit vector
431    }
432
433    /// Get memory size (alias for size_bytes)
434    pub fn memory_size(&self) -> usize {
435        self.size_bytes()
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_bloom_filter_basic() {
445        let mut bloom = BloomFilter::new(1000, 0.01);
446
447        // Insert items
448        for i in 0..100 {
449            bloom.insert(&i);
450        }
451
452        // Check inserted items (should all return true)
453        for i in 0..100 {
454            assert!(bloom.contains(&i));
455        }
456
457        // Check non-inserted items (most should return false)
458        let mut false_positives = 0;
459        for i in 100..1000 {
460            if bloom.contains(&i) {
461                false_positives += 1;
462            }
463        }
464
465        // False positive rate should be roughly 1% (allow up to 3% for variance)
466        let fp_rate = false_positives as f64 / 900.0;
467        assert!(fp_rate < 0.03, "False positive rate too high: {}", fp_rate);
468    }
469
470    #[test]
471    fn test_bloom_filter_serialization() {
472        let mut bloom = BloomFilter::new(100, 0.01);
473
474        for i in 0..50 {
475            bloom.insert(&i);
476        }
477
478        let bytes = bloom.to_bytes();
479        let restored = BloomFilter::from_bytes(&bytes).unwrap();
480
481        // Check all items are still present
482        for i in 0..50 {
483            assert!(restored.contains(&i));
484        }
485    }
486
487    #[test]
488    fn test_bloom_filter_strings() {
489        let mut bloom = BloomFilter::new(1000, 0.01);
490
491        let items = vec!["hello", "world", "foo", "bar", "baz"];
492        for item in &items {
493            bloom.insert(item);
494        }
495
496        for item in &items {
497            assert!(bloom.contains(item));
498        }
499
500        // Bloom filters can have false positives, so this might return true
501        let _ = bloom.contains(&"not_there");
502    }
503}
504
505// ============================================================================
506// jj.md Task 3: Blocked Bloom Filter with Cache-Line Alignment
507// ============================================================================
508
509/// Cache line size in bytes (64 bytes on most modern CPUs)
510const CACHE_LINE_SIZE: usize = 64;
511
512/// Bits per cache line block (512 bits = 64 bytes)
513const BLOCK_BITS: usize = CACHE_LINE_SIZE * 8;
514
515/// Level-adaptive false positive rates for LSM-tree
516/// L0 has the most reads, so we use lower FPR
517#[derive(Debug, Clone, Copy)]
518pub struct LevelAdaptiveFPR {
519    /// FPR for L0 (most frequently accessed)
520    pub l0: f64,
521    /// FPR for L1
522    pub l1: f64,
523    /// FPR for L2 and higher (less frequently accessed)
524    pub l2_plus: f64,
525}
526
527impl Default for LevelAdaptiveFPR {
528    fn default() -> Self {
529        Self {
530            l0: 0.001,     // 0.1% - Very low for L0
531            l1: 0.005,     // 0.5%
532            l2_plus: 0.01, // 1% - Standard for deeper levels
533        }
534    }
535}
536
537impl LevelAdaptiveFPR {
538    /// Get FPR for a given level
539    pub fn for_level(&self, level: usize) -> f64 {
540        match level {
541            0 => self.l0,
542            1 => self.l1,
543            _ => self.l2_plus,
544        }
545    }
546}
547
548// =============================================================================
549// Task 9 Enhancement: Bloom Filter Cascade
550// =============================================================================
551
552/// Cascaded bloom filters with decreasing false positive rates
553///
554/// ## Design
555/// Organizes multiple bloom filters in a cascade where:
556/// - First filter (L0): Smallest, catches most negatives quickly
557/// - Subsequent filters: Progressively lower FPR for items that pass
558///
559/// ## Benefits
560/// - Fast rejection of non-existent keys (first filter catches ~99%)
561/// - Lower overall FPR than single filter for same memory budget
562/// - Amortized O(1) lookup with early termination
563///
564/// ## Example Configuration
565/// ```text
566/// L0: 1% FPR, 100KB  → Catches 99% of negatives
567/// L1: 0.1% FPR, 50KB → Catches 99.9% of remaining
568/// L2: 0.01% FPR, 25KB → Final filter for edge cases
569/// Combined FPR: 0.01 * 0.001 * 0.0001 = 0.000000001 (1 in billion)
570/// ```
571#[derive(Debug)]
572pub struct BloomFilterCascade {
573    /// Cascade of filters (index 0 = first check)
574    filters: Vec<BlockedBloomFilter>,
575    /// FPR for each level
576    level_fpr: Vec<f64>,
577    /// Number of items expected
578    expected_items: usize,
579}
580
581impl BloomFilterCascade {
582    /// Create a new cascade with specified levels
583    ///
584    /// # Arguments
585    /// * `expected_items` - Number of items to store
586    /// * `level_fprs` - False positive rate for each level (e.g., [0.01, 0.001, 0.0001])
587    pub fn new(expected_items: usize, level_fprs: Vec<f64>) -> Self {
588        let filters = level_fprs
589            .iter()
590            .map(|&fpr| BlockedBloomFilter::new(expected_items, fpr))
591            .collect();
592
593        Self {
594            filters,
595            level_fpr: level_fprs,
596            expected_items,
597        }
598    }
599
600    /// Create default 3-level cascade
601    ///
602    /// - L0: 1% FPR (fast initial check)
603    /// - L1: 0.1% FPR (medium precision)
604    /// - L2: 0.01% FPR (high precision)
605    pub fn default_cascade(expected_items: usize) -> Self {
606        Self::new(expected_items, vec![0.01, 0.001, 0.0001])
607    }
608
609    /// Create memory-optimized 2-level cascade
610    pub fn compact(expected_items: usize) -> Self {
611        Self::new(expected_items, vec![0.01, 0.0001])
612    }
613
614    /// Insert an item into all cascade levels
615    pub fn insert<T: Hash>(&mut self, item: &T) {
616        for filter in &mut self.filters {
617            filter.insert(item);
618        }
619    }
620
621    /// Check if item might exist (with early termination)
622    ///
623    /// Returns false (definitely not present) as soon as any level returns false.
624    /// Returns true only if ALL levels return true.
625    pub fn contains<T: Hash>(&self, item: &T) -> bool {
626        for filter in &self.filters {
627            if !filter.contains(item) {
628                return false;
629            }
630        }
631        true
632    }
633
634    /// Check with level tracking (for debugging/stats)
635    pub fn contains_with_level<T: Hash>(&self, item: &T) -> (bool, usize) {
636        for (level, filter) in self.filters.iter().enumerate() {
637            if !filter.contains(item) {
638                return (false, level);
639            }
640        }
641        (true, self.filters.len())
642    }
643
644    /// Combined theoretical false positive rate
645    pub fn combined_fpr(&self) -> f64 {
646        self.level_fpr.iter().product()
647    }
648
649    /// Number of cascade levels
650    pub fn num_levels(&self) -> usize {
651        self.filters.len()
652    }
653
654    /// Memory usage in bytes
655    pub fn memory_usage(&self) -> usize {
656        self.filters.iter().map(|f| f.memory_usage()).sum()
657    }
658
659    /// Serialize to bytes
660    pub fn to_bytes(&self) -> Vec<u8> {
661        use byteorder::{LittleEndian, WriteBytesExt};
662
663        let mut buf = Vec::new();
664
665        // Magic: "BCF\x01" (Bloom Cascade Filter v1)
666        buf.extend_from_slice(&[0x42, 0x43, 0x46, 0x01]);
667
668        // Header
669        buf.write_u64::<LittleEndian>(self.expected_items as u64)
670            .unwrap();
671        buf.write_u64::<LittleEndian>(self.filters.len() as u64)
672            .unwrap();
673
674        // FPR for each level
675        for &fpr in &self.level_fpr {
676            buf.write_u64::<LittleEndian>(fpr.to_bits()).unwrap();
677        }
678
679        // Each filter's serialized data
680        for filter in &self.filters {
681            let filter_bytes = filter.to_bytes();
682            buf.write_u64::<LittleEndian>(filter_bytes.len() as u64)
683                .unwrap();
684            buf.extend_from_slice(&filter_bytes);
685        }
686
687        // Checksum
688        let checksum = blake3::hash(&buf);
689        buf.extend_from_slice(checksum.as_bytes());
690
691        buf
692    }
693
694    /// Deserialize from bytes
695    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
696        use byteorder::{LittleEndian, ReadBytesExt};
697        use std::io::{Cursor, Error, ErrorKind};
698
699        const CHECKSUM_SIZE: usize = 32;
700        const MIN_SIZE: usize = 4 + 16 + CHECKSUM_SIZE; // magic + header + checksum
701
702        if bytes.len() < MIN_SIZE {
703            return Err(Error::new(
704                ErrorKind::InvalidData,
705                "Bloom cascade data too small",
706            ));
707        }
708
709        // Verify magic
710        if bytes[..4] != [0x42, 0x43, 0x46, 0x01] {
711            return Err(Error::new(
712                ErrorKind::InvalidData,
713                "Invalid bloom cascade magic number",
714            ));
715        }
716
717        // Verify checksum
718        let data_len = bytes.len() - CHECKSUM_SIZE;
719        let (data, stored_checksum) = bytes.split_at(data_len);
720        let computed_checksum = blake3::hash(data);
721        if computed_checksum.as_bytes() != stored_checksum {
722            return Err(Error::new(
723                ErrorKind::InvalidData,
724                "Bloom cascade checksum mismatch",
725            ));
726        }
727
728        let mut cursor = Cursor::new(&bytes[4..]); // Skip magic
729
730        let expected_items = cursor.read_u64::<LittleEndian>()? as usize;
731        let num_levels = cursor.read_u64::<LittleEndian>()? as usize;
732
733        // Read FPRs
734        let mut level_fpr = Vec::with_capacity(num_levels);
735        for _ in 0..num_levels {
736            let bits = cursor.read_u64::<LittleEndian>()?;
737            level_fpr.push(f64::from_bits(bits));
738        }
739
740        // Read filters
741        let mut filters = Vec::with_capacity(num_levels);
742        for _ in 0..num_levels {
743            let filter_len = cursor.read_u64::<LittleEndian>()? as usize;
744            let pos = cursor.position() as usize;
745            let filter_bytes = &bytes[4 + pos..4 + pos + filter_len];
746            cursor.set_position((pos + filter_len) as u64);
747
748            let filter = BlockedBloomFilter::from_bytes(filter_bytes)?;
749            filters.push(filter);
750        }
751
752        Ok(Self {
753            filters,
754            level_fpr,
755            expected_items,
756        })
757    }
758}
759
760// ============================================================================
761// Unified Bloom Filter for SSTable Reader (supports both formats)
762// ============================================================================
763
764/// Magic numbers for format detection
765const BLOOM_MAGIC_V2: [u8; 4] = [0x42, 0x4C, 0x4D, 0x02]; // "BLM\x02" - Standard BloomFilter
766const BLOCKED_BLOOM_MAGIC: [u8; 4] = [0x42, 0x42, 0x46, 0x01]; // "BBF\x01" - BlockedBloomFilter
767
768/// Unified bloom filter that can be deserialized from either format.
769///
770/// This allows SSTableReader to handle both old SSTables (with BloomFilter)
771/// and new SSTables (with BlockedBloomFilter) transparently.
772#[derive(Debug)]
773pub enum UnifiedBloomFilter {
774    /// Standard bloom filter (legacy format)
775    Standard(BloomFilter),
776    /// Cache-line optimized blocked bloom filter (new format)
777    Blocked(BlockedBloomFilter),
778}
779
780impl UnifiedBloomFilter {
781    /// Check if an item might be in the set
782    pub fn contains<T: std::hash::Hash>(&self, item: &T) -> bool {
783        match self {
784            UnifiedBloomFilter::Standard(bf) => bf.contains(item),
785            UnifiedBloomFilter::Blocked(bbf) => bbf.contains(item),
786        }
787    }
788
789    /// Deserialize from bytes, auto-detecting the format
790    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
791        use std::io::{Error, ErrorKind};
792
793        if bytes.len() < 4 {
794            return Err(Error::new(
795                ErrorKind::InvalidData,
796                "Bloom filter data too small to detect format",
797            ));
798        }
799
800        // Check magic number to determine format
801        if bytes[..4] == BLOCKED_BLOOM_MAGIC {
802            // New blocked bloom filter format
803            BlockedBloomFilter::from_bytes(bytes).map(UnifiedBloomFilter::Blocked)
804        } else if bytes[..4] == BLOOM_MAGIC_V2 || bytes.len() >= 56 {
805            // Standard bloom filter (v2 with magic or v1 legacy)
806            BloomFilter::from_bytes(bytes).map(UnifiedBloomFilter::Standard)
807        } else {
808            Err(Error::new(
809                ErrorKind::InvalidData,
810                "Unknown bloom filter format",
811            ))
812        }
813    }
814
815    /// Get memory size in bytes
816    pub fn memory_size(&self) -> usize {
817        match self {
818            UnifiedBloomFilter::Standard(bf) => bf.memory_size(),
819            UnifiedBloomFilter::Blocked(bbf) => bbf.memory_usage(),
820        }
821    }
822}
823
824/// Cache-line aligned blocked bloom filter for improved cache efficiency.
825///
826/// ## jj.md Task 3: Blocked Bloom Filter
827///
828/// This implementation groups all hash probes for a key into a single
829/// cache-line sized block, reducing cache misses from k (number of hashes)
830/// to 1.
831///
832/// ### Performance Benefits
833///
834/// - **Cache efficiency**: All probes hit the same cache line
835/// - **Reduced memory bandwidth**: One cache line load vs k scattered loads
836/// - **Better TLB usage**: Single page access per lookup
837///
838/// ### Trade-offs
839///
840/// - Slightly higher FPR than standard bloom filter at same memory usage
841/// - Block selection uses one hash, remaining hashes probe within block
842///
843/// ### Reference
844///
845/// "Cache-Efficient Bloom Filters" (Putze et al., 2007)
846#[derive(Debug)]
847pub struct BlockedBloomFilter {
848    /// Cache-line aligned blocks (each block is 64 bytes = 512 bits)
849    blocks: Vec<[u64; 8]>,
850    /// Number of blocks
851    num_blocks: usize,
852    /// Number of hash probes within each block
853    num_hashes: usize,
854    /// Expected items (for sizing)
855    expected_items: usize,
856}
857
858impl BlockedBloomFilter {
859    /// Create a new blocked bloom filter.
860    ///
861    /// # Arguments
862    /// * `expected_items` - Number of items expected to be inserted
863    /// * `false_positive_rate` - Desired false positive rate (e.g., 0.01 for 1%)
864    pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
865        // Calculate bits per key needed
866        // m/n = -ln(p) / ln(2)² ≈ -1.44 * ln(p)
867        let bits_per_key = (-1.44 * false_positive_rate.ln()).ceil() as usize;
868        let bits_per_key = bits_per_key.max(4); // Minimum 4 bits per key
869
870        let total_bits = expected_items * bits_per_key;
871        let num_blocks = total_bits.div_ceil(BLOCK_BITS);
872        let num_blocks = num_blocks.max(1);
873
874        // Optimal number of hashes: k = ln(2) * (m/n) ≈ 0.693 * bits_per_key
875        let num_hashes = ((0.693 * bits_per_key as f64).ceil() as usize).clamp(1, 8);
876
877        let blocks = vec![[0u64; 8]; num_blocks];
878
879        Self {
880            blocks,
881            num_blocks,
882            num_hashes,
883            expected_items,
884        }
885    }
886
887    /// Create a blocked bloom filter with level-adaptive FPR.
888    pub fn for_level(expected_items: usize, level: usize) -> Self {
889        let fpr = LevelAdaptiveFPR::default().for_level(level);
890        Self::new(expected_items, fpr)
891    }
892
893    /// Insert an item into the bloom filter.
894    pub fn insert<T: Hash>(&mut self, item: &T) {
895        let (block_idx, h2) = self.compute_hashes(item);
896
897        // Probe within the selected block
898        let block = &mut self.blocks[block_idx];
899        for i in 0..self.num_hashes {
900            let bit_pos = (h2.wrapping_add(i * 31)) % BLOCK_BITS;
901            let word_idx = bit_pos / 64;
902            let bit_offset = bit_pos % 64;
903            block[word_idx] |= 1u64 << bit_offset;
904        }
905    }
906
907    /// Check if an item might be in the set.
908    ///
909    /// Returns `true` if item *might* be present (could be false positive).
910    /// Returns `false` if item is *definitely not* present (100% accurate).
911    pub fn contains<T: Hash>(&self, item: &T) -> bool {
912        let (block_idx, h2) = self.compute_hashes(item);
913        let block = &self.blocks[block_idx];
914
915        for i in 0..self.num_hashes {
916            let bit_pos = (h2.wrapping_add(i * 31)) % BLOCK_BITS;
917            let word_idx = bit_pos / 64;
918            let bit_offset = bit_pos % 64;
919            if (block[word_idx] & (1u64 << bit_offset)) == 0 {
920                return false;
921            }
922        }
923        true
924    }
925
926    /// Compute block index and intra-block hash.
927    fn compute_hashes<T: Hash>(&self, item: &T) -> (usize, usize) {
928        use std::collections::hash_map::DefaultHasher;
929
930        let mut hasher = DefaultHasher::new();
931        item.hash(&mut hasher);
932        let h1 = hasher.finish();
933
934        // Use upper and lower bits for two independent-ish hashes
935        let block_idx = (h1 as usize) % self.num_blocks;
936        let h2 = ((h1 >> 32) as usize) | ((h1 as usize) << 32);
937
938        (block_idx, h2)
939    }
940
941    /// Serialize to bytes with checksum.
942    pub fn to_bytes(&self) -> Vec<u8> {
943        use byteorder::{LittleEndian, WriteBytesExt};
944
945        let mut buf = Vec::new();
946
947        // Magic number for blocked bloom filter: "BBF\x01"
948        buf.extend_from_slice(&[0x42, 0x42, 0x46, 0x01]);
949
950        // Header
951        buf.write_u64::<LittleEndian>(self.num_blocks as u64)
952            .unwrap();
953        buf.write_u64::<LittleEndian>(self.num_hashes as u64)
954            .unwrap();
955        buf.write_u64::<LittleEndian>(self.expected_items as u64)
956            .unwrap();
957
958        // Block data
959        for block in &self.blocks {
960            for &word in block {
961                buf.write_u64::<LittleEndian>(word).unwrap();
962            }
963        }
964
965        // BLAKE3 checksum
966        let checksum = blake3::hash(&buf);
967        buf.extend_from_slice(checksum.as_bytes());
968
969        buf
970    }
971
972    /// Deserialize from bytes with checksum validation.
973    pub fn from_bytes(bytes: &[u8]) -> std::io::Result<Self> {
974        use byteorder::{LittleEndian, ReadBytesExt};
975        use std::io::{Cursor, Error, ErrorKind};
976
977        const MAGIC: [u8; 4] = [0x42, 0x42, 0x46, 0x01]; // "BBF\x01"
978        const CHECKSUM_SIZE: usize = 32;
979        const HEADER_SIZE: usize = 4 + 24; // magic + 3 * u64
980
981        if bytes.len() < HEADER_SIZE + CHECKSUM_SIZE {
982            return Err(Error::new(
983                ErrorKind::InvalidData,
984                "Blocked bloom filter data too small",
985            ));
986        }
987
988        // Verify magic
989        if bytes[..4] != MAGIC {
990            return Err(Error::new(
991                ErrorKind::InvalidData,
992                "Invalid blocked bloom filter magic number",
993            ));
994        }
995
996        // Verify checksum
997        let data_len = bytes.len() - CHECKSUM_SIZE;
998        let (data, stored_checksum) = bytes.split_at(data_len);
999        let computed_checksum = blake3::hash(data);
1000
1001        if computed_checksum.as_bytes() != stored_checksum {
1002            return Err(Error::new(
1003                ErrorKind::InvalidData,
1004                "Blocked bloom filter checksum mismatch",
1005            ));
1006        }
1007
1008        // Parse header
1009        let mut cursor = Cursor::new(&bytes[4..]);
1010        let num_blocks = cursor.read_u64::<LittleEndian>()? as usize;
1011        let num_hashes = cursor.read_u64::<LittleEndian>()? as usize;
1012        let expected_items = cursor.read_u64::<LittleEndian>()? as usize;
1013
1014        // Validate sizes
1015        let expected_data_size = HEADER_SIZE + num_blocks * 64 + CHECKSUM_SIZE;
1016        if bytes.len() != expected_data_size {
1017            return Err(Error::new(
1018                ErrorKind::InvalidData,
1019                format!(
1020                    "Blocked bloom filter size mismatch: {} vs {}",
1021                    bytes.len(),
1022                    expected_data_size
1023                ),
1024            ));
1025        }
1026
1027        // Parse blocks
1028        let mut blocks = Vec::with_capacity(num_blocks);
1029        for _ in 0..num_blocks {
1030            let mut block = [0u64; 8];
1031            for word in &mut block {
1032                *word = cursor.read_u64::<LittleEndian>()?;
1033            }
1034            blocks.push(block);
1035        }
1036
1037        Ok(Self {
1038            blocks,
1039            num_blocks,
1040            num_hashes,
1041            expected_items,
1042        })
1043    }
1044
1045    /// Get size in bytes.
1046    pub fn size_bytes(&self) -> usize {
1047        4 + 24 + self.num_blocks * 64 + 32 // magic + header + blocks + checksum
1048    }
1049
1050    /// Get memory usage in bytes.
1051    pub fn memory_usage(&self) -> usize {
1052        std::mem::size_of::<Self>() + self.num_blocks * 64
1053    }
1054
1055    /// Get fill ratio (fraction of bits set).
1056    pub fn fill_ratio(&self) -> f64 {
1057        let set_bits: usize = self
1058            .blocks
1059            .iter()
1060            .flat_map(|b| b.iter())
1061            .map(|w| w.count_ones() as usize)
1062            .sum();
1063
1064        let total_bits = self.num_blocks * BLOCK_BITS;
1065        set_bits as f64 / total_bits as f64
1066    }
1067}
1068
1069#[cfg(test)]
1070mod blocked_bloom_tests {
1071    use super::*;
1072
1073    #[test]
1074    fn test_blocked_bloom_basic() {
1075        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1076
1077        // Insert items
1078        for i in 0..100 {
1079            bloom.insert(&i);
1080        }
1081
1082        // Check inserted items (should all return true)
1083        for i in 0..100 {
1084            assert!(bloom.contains(&i), "Missing inserted item {}", i);
1085        }
1086
1087        // Check non-inserted items (most should return false)
1088        let mut false_positives = 0;
1089        for i in 100..1000 {
1090            if bloom.contains(&i) {
1091                false_positives += 1;
1092            }
1093        }
1094
1095        // Blocked bloom filters have slightly higher FPR, allow up to 5%
1096        let fp_rate = false_positives as f64 / 900.0;
1097        assert!(fp_rate < 0.05, "False positive rate too high: {}", fp_rate);
1098    }
1099
1100    #[test]
1101    fn test_blocked_bloom_serialization() {
1102        let mut bloom = BlockedBloomFilter::new(100, 0.01);
1103
1104        for i in 0..50 {
1105            bloom.insert(&i);
1106        }
1107
1108        let bytes = bloom.to_bytes();
1109        let restored = BlockedBloomFilter::from_bytes(&bytes).unwrap();
1110
1111        // Check all items are still present
1112        for i in 0..50 {
1113            assert!(
1114                restored.contains(&i),
1115                "Missing item {} after deserialize",
1116                i
1117            );
1118        }
1119    }
1120
1121    #[test]
1122    fn test_blocked_bloom_level_adaptive() {
1123        // L0 should have lower FPR (more bits per key)
1124        let l0_bloom = BlockedBloomFilter::for_level(1000, 0);
1125        let l2_bloom = BlockedBloomFilter::for_level(1000, 2);
1126
1127        // L0 should use more memory (lower FPR = more bits)
1128        assert!(
1129            l0_bloom.memory_usage() >= l2_bloom.memory_usage(),
1130            "L0 bloom should use >= memory than L2"
1131        );
1132    }
1133
1134    #[test]
1135    fn test_blocked_bloom_u128_keys() {
1136        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1137
1138        // Insert u128 edge IDs
1139        let edge_ids: Vec<u128> = (0..100).map(|i| i as u128 * 12345678901234567890).collect();
1140
1141        for id in &edge_ids {
1142            bloom.insert(id);
1143        }
1144
1145        // All inserted items should be found
1146        for id in &edge_ids {
1147            assert!(bloom.contains(id));
1148        }
1149    }
1150
1151    #[test]
1152    fn test_blocked_bloom_checksum_corruption() {
1153        let mut bloom = BlockedBloomFilter::new(100, 0.01);
1154        bloom.insert(&42);
1155
1156        let mut bytes = bloom.to_bytes();
1157
1158        // Corrupt a byte in the data
1159        if bytes.len() > 10 {
1160            bytes[10] ^= 0xFF;
1161        }
1162
1163        // Should fail checksum validation
1164        assert!(BlockedBloomFilter::from_bytes(&bytes).is_err());
1165    }
1166
1167    #[test]
1168    fn test_blocked_bloom_fill_ratio() {
1169        let mut bloom = BlockedBloomFilter::new(1000, 0.01);
1170
1171        assert_eq!(bloom.fill_ratio(), 0.0);
1172
1173        for i in 0..500 {
1174            bloom.insert(&i);
1175        }
1176
1177        let ratio = bloom.fill_ratio();
1178        assert!(ratio > 0.0 && ratio < 1.0);
1179    }
1180
1181    #[test]
1182    fn test_level_adaptive_fpr() {
1183        let fpr = LevelAdaptiveFPR::default();
1184
1185        assert_eq!(fpr.for_level(0), 0.001);
1186        assert_eq!(fpr.for_level(1), 0.005);
1187        assert_eq!(fpr.for_level(2), 0.01);
1188        assert_eq!(fpr.for_level(5), 0.01);
1189    }
1190
1191    // Bloom Filter Cascade Tests
1192
1193    #[test]
1194    fn test_cascade_basic() {
1195        let mut cascade = BloomFilterCascade::default_cascade(1000);
1196
1197        // Insert items
1198        for i in 0..100 {
1199            cascade.insert(&i);
1200        }
1201
1202        // All inserted items should be found
1203        for i in 0..100 {
1204            assert!(cascade.contains(&i), "Item {} should be found", i);
1205        }
1206
1207        // Verify cascade has expected structure
1208        assert_eq!(cascade.num_levels(), 3);
1209        assert!(cascade.combined_fpr() < 0.0001);
1210    }
1211
1212    #[test]
1213    fn test_cascade_early_termination() {
1214        let mut cascade = BloomFilterCascade::new(1000, vec![0.5, 0.5, 0.5]); // High FPR for testing
1215
1216        cascade.insert(&42);
1217
1218        // Inserted item passes all levels
1219        let (found, level) = cascade.contains_with_level(&42);
1220        assert!(found);
1221        assert_eq!(level, 3); // Passed all 3 levels
1222
1223        // Non-inserted item should fail at some level
1224        let (found, level) = cascade.contains_with_level(&999999);
1225        if !found {
1226            assert!(level < 3, "Should fail before reaching all levels");
1227        }
1228    }
1229
1230    #[test]
1231    fn test_cascade_serialization() {
1232        let mut cascade = BloomFilterCascade::default_cascade(500);
1233
1234        for i in 0..100 {
1235            cascade.insert(&i);
1236        }
1237
1238        // Serialize and deserialize
1239        let bytes = cascade.to_bytes();
1240        let restored = BloomFilterCascade::from_bytes(&bytes).unwrap();
1241
1242        // Check properties preserved
1243        assert_eq!(restored.num_levels(), cascade.num_levels());
1244        assert_eq!(restored.expected_items, cascade.expected_items);
1245
1246        // Check contents
1247        for i in 0..100 {
1248            assert!(restored.contains(&i));
1249        }
1250    }
1251
1252    #[test]
1253    fn test_cascade_memory() {
1254        let cascade = BloomFilterCascade::default_cascade(10000);
1255
1256        // Should use reasonable memory
1257        let mem = cascade.memory_usage();
1258        assert!(mem > 0);
1259
1260        // Combined FPR should be very low
1261        let combined_fpr = cascade.combined_fpr();
1262        assert!(
1263            combined_fpr < 0.000001,
1264            "Combined FPR should be < 1 in million"
1265        );
1266    }
1267
1268    #[test]
1269    fn test_cascade_compact() {
1270        let compact = BloomFilterCascade::compact(1000);
1271        let full = BloomFilterCascade::default_cascade(1000);
1272
1273        // Compact should use fewer levels
1274        assert_eq!(compact.num_levels(), 2);
1275        assert_eq!(full.num_levels(), 3);
1276
1277        // But still maintain low FPR
1278        assert!(compact.combined_fpr() < 0.0001);
1279    }
1280}