Skip to main content

reddb_server/storage/primitives/
split_block_bloom.rs

1//! SplitBlockBloomFilter — cache-line-aligned, SIMD-friendly bloom filter.
2//!
3//! Direct port of MongoDB's `SplitBlockBloomFilter` from
4//! `src/mongo/db/exec/sbe/util/bloom_filter.h`.
5//!
6//! # Design
7//!
8//! Each block is 32 bytes (8 × u32 = 256 bits), aligned to a cache line.
9//! Insert/probe uses 8 independent salt multiplications, one bit per word.
10//! Block index uses power-of-2 masking (fast modulo via bitwise AND).
11//!
12//! For k=8 bits per element: ~10 bits needed per element for ~1% FPR.
13//! At n=1000: 8 blocks (256 bytes). At n=10_000: 128 blocks (4 KB).
14//!
15//! # Usage in `CompiledEntityFilter`
16//!
17//! For `Filter::In` with >IN_BLOOM_THRESHOLD values, the compiled filter
18//! builds a `SplitBlockBloom` at compile time. At evaluate time:
19//! 1. Hash field value to u32 (fast, no allocation)
20//! 2. Bloom probe — if **false**, skip HashSet (definite miss, O(1))
21//! 3. HashSet probe — exact membership check (only ~1% FPR false positives reach here)
22//!
23//! Benefit: for rows where the field exists but doesn't match any IN value,
24//! the bloom eliminates the HashSet::contains call ~99% of the time.
25
26const SALTS: [u32; 8] = [
27    0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31,
28];
29
30/// One 32-byte cache-line-aligned block: 8 × u32 words.
31#[repr(align(32))]
32#[derive(Clone, Default, PartialEq, Eq)]
33pub struct Block {
34    words: [u32; 8],
35}
36
37impl std::fmt::Debug for Block {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "Block({:08x?})", &self.words)
40    }
41}
42
43/// Split-block bloom filter. Build once at compile time, probe at query time.
44/// Zero-allocation probe path.
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct SplitBlockBloom {
47    blocks: Vec<Block>,
48    /// `num_blocks - 1` — mask for fast modulo via bitwise AND.
49    mask: usize,
50}
51
52impl SplitBlockBloom {
53    /// Build a filter sized for `n` elements with ~1% FPR.
54    pub fn with_capacity(n: usize) -> Self {
55        // ~10 bits per element for 1% FPR with 8 salt bits per insert.
56        // Each block holds 256 bits. Round up to next power of two.
57        let bits_needed = (n * 10).max(256);
58        let blocks_needed = bits_needed.div_ceil(256);
59        let num_blocks = blocks_needed.next_power_of_two();
60        Self {
61            blocks: vec![Block::default(); num_blocks],
62            mask: num_blocks - 1,
63        }
64    }
65
66    /// Insert a u32 key into the filter.
67    #[inline]
68    pub fn insert(&mut self, key: u32) {
69        let block_idx = (key as usize) & self.mask;
70        let block = &mut self.blocks[block_idx];
71        for (i, &salt) in SALTS.iter().enumerate() {
72            let bit = key.wrapping_mul(salt) >> 27; // 5-bit position 0..31
73            block.words[i] |= 1u32 << bit;
74        }
75    }
76
77    /// Return `true` if `key` **may** be in the set (false positives possible).
78    /// Return `false` if `key` is **definitely absent** (no false negatives).
79    #[inline]
80    pub fn probe(&self, key: u32) -> bool {
81        let block_idx = (key as usize) & self.mask;
82        let block = &self.blocks[block_idx];
83        for (i, &salt) in SALTS.iter().enumerate() {
84            let bit = key.wrapping_mul(salt) >> 27;
85            if block.words[i] & (1u32 << bit) == 0 {
86                return false;
87            }
88        }
89        true
90    }
91
92    /// Number of blocks allocated (each block = 32 bytes).
93    pub fn num_blocks(&self) -> usize {
94        self.blocks.len()
95    }
96
97    /// Serialize to a self-describing blob: a 4-byte LE block count followed
98    /// by `num_blocks × 8` little-endian `u32` words. The block count is
99    /// always a power of two, so [`from_bytes`](Self::from_bytes) can rebuild
100    /// the modulo mask without storing it. Used to persist a per-granule
101    /// bloom in a sealed columnar chunk's footer (#855).
102    pub fn to_bytes(&self) -> Vec<u8> {
103        let mut out = Vec::with_capacity(4 + self.blocks.len() * 32);
104        out.extend_from_slice(&(self.blocks.len() as u32).to_le_bytes());
105        for block in &self.blocks {
106            for &w in &block.words {
107                out.extend_from_slice(&w.to_le_bytes());
108            }
109        }
110        out
111    }
112
113    /// Rebuild a filter from [`to_bytes`](Self::to_bytes). Returns `None` on a
114    /// truncated blob or a non-power-of-two block count (a corrupt mask would
115    /// silently mis-route probes and could manufacture false negatives, so we
116    /// refuse it rather than risk under-inclusion).
117    pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
118        if bytes.len() < 4 {
119            return None;
120        }
121        let num_blocks = u32::from_le_bytes(bytes[0..4].try_into().ok()?) as usize;
122        if num_blocks == 0 || !num_blocks.is_power_of_two() {
123            return None;
124        }
125        if bytes.len() < 4 + num_blocks * 32 {
126            return None;
127        }
128        let mut blocks = Vec::with_capacity(num_blocks);
129        let mut cur = 4;
130        for _ in 0..num_blocks {
131            let mut words = [0u32; 8];
132            for w in &mut words {
133                *w = u32::from_le_bytes(bytes[cur..cur + 4].try_into().ok()?);
134                cur += 4;
135            }
136            blocks.push(Block { words });
137        }
138        Some(Self {
139            blocks,
140            mask: num_blocks - 1,
141        })
142    }
143}
144
145/// Hash a raw byte slice to a `u32` for bloom-filter use. The writer and the
146/// pruner MUST fold a value through this same function so a granule's stored
147/// bloom and a probe key agree bit-for-bit — that identity is what makes the
148/// no-false-negative guarantee hold across the persisted boundary (#855).
149pub fn hash_bytes_u32(bytes: &[u8]) -> u32 {
150    use std::hash::{Hash, Hasher};
151    let mut h = std::collections::hash_map::DefaultHasher::new();
152    bytes.hash(&mut h);
153    let bits = h.finish();
154    (bits ^ (bits >> 32)) as u32
155}
156
157/// Hash a `Value` to a u32 for bloom filter use.
158/// Uses the standard Hash impl (which hashes discriminant + content).
159/// Folds the 64-bit DefaultHasher output to 32 bits via XOR.
160pub fn hash_value_u32(v: &crate::storage::schema::Value) -> u32 {
161    use std::hash::{Hash, Hasher};
162    let mut h = std::collections::hash_map::DefaultHasher::new();
163    v.hash(&mut h);
164    let bits = h.finish();
165    (bits ^ (bits >> 32)) as u32
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_insert_then_probe() {
174        let mut bloom = SplitBlockBloom::with_capacity(100);
175        for i in 0u32..100 {
176            bloom.insert(i);
177        }
178        for i in 0u32..100 {
179            assert!(bloom.probe(i), "false negative for key {i}");
180        }
181    }
182
183    #[test]
184    fn test_absent_key_may_return_false() {
185        let mut bloom = SplitBlockBloom::with_capacity(1000);
186        for i in 0u32..1000 {
187            bloom.insert(i * 2); // insert even numbers
188        }
189        // odd numbers were never inserted — some may be false positives, but
190        // most should be absent. We just verify there are NO false negatives.
191        for i in 0u32..1000 {
192            assert!(bloom.probe(i * 2), "false negative for key {}", i * 2);
193        }
194    }
195
196    #[test]
197    fn to_bytes_from_bytes_round_trips_and_keeps_no_false_negatives() {
198        let mut bloom = SplitBlockBloom::with_capacity(500);
199        for i in 0u32..500 {
200            bloom.insert(i.wrapping_mul(2_654_435_761));
201        }
202        let blob = bloom.to_bytes();
203        let restored = SplitBlockBloom::from_bytes(&blob).expect("round-trips");
204        assert_eq!(restored, bloom);
205        // The persisted filter still never reports a false negative.
206        for i in 0u32..500 {
207            assert!(restored.probe(i.wrapping_mul(2_654_435_761)));
208        }
209    }
210
211    #[test]
212    fn from_bytes_rejects_truncated_or_non_power_of_two() {
213        assert!(SplitBlockBloom::from_bytes(&[]).is_none());
214        assert!(SplitBlockBloom::from_bytes(&[1, 2, 3]).is_none());
215        // Claims 3 blocks (not a power of two) → rejected.
216        assert!(SplitBlockBloom::from_bytes(&3u32.to_le_bytes()).is_none());
217        // Claims 2 blocks but supplies no word payload → truncated.
218        assert!(SplitBlockBloom::from_bytes(&2u32.to_le_bytes()).is_none());
219    }
220
221    #[test]
222    fn hash_bytes_u32_is_stable_across_calls() {
223        let a = hash_bytes_u32(&7u64.to_le_bytes());
224        let b = hash_bytes_u32(&7u64.to_le_bytes());
225        assert_eq!(a, b);
226    }
227
228    #[test]
229    fn test_false_positive_rate_approximately_one_percent() {
230        const N: usize = 10_000;
231        let mut bloom = SplitBlockBloom::with_capacity(N);
232        for i in 0u32..N as u32 {
233            bloom.insert(i);
234        }
235        let mut fp = 0usize;
236        let probes = 10_000usize;
237        for i in N as u32..(N as u32 + probes as u32) {
238            if bloom.probe(i) {
239                fp += 1;
240            }
241        }
242        let fpr = fp as f64 / probes as f64;
243        // Allow up to 5% FPR — the theoretical ~1% varies with data patterns.
244        assert!(fpr < 0.05, "FPR too high: {fpr:.3}");
245    }
246}