Skip to main content

reddb_server/storage/index/
bloom_segment.rs

1//! Reusable bloom filter header that any segment can embed.
2//!
3//! Segments (table pages, vector segments, timeseries chunks, graph
4//! partitions) all benefit from the same question: "is key X *possibly* in
5//! this segment?". Rather than reimplementing bloom wiring in every storage
6//! engine, wrap [`crate::storage::primitives::BloomFilter`] here with a
7//! serialisation header and a small trait `HasBloom` for owners to plug in.
8//!
9//! Layout on disk / inside a segment header is:
10//!
11//! ```text
12//! [ u8  magic       ]
13//! [ u8  num_hashes   ]
14//! [ u32 bit_size     ]   // big-endian
15//! [ u32 inserted     ]   // monotonic counter, best-effort
16//! [ bytes...          ]   // bit array
17//! ```
18//!
19//! Readers that don't care about bloom can skip `4 + (bit_size + 7) / 8`
20//! bytes after the 10-byte header.
21
22use crate::storage::primitives::BloomFilter;
23
24pub use reddb_file::BloomSegmentFrameError as BloomSegmentError;
25
26/// Trait implemented by owners of a segment-level bloom filter.
27///
28/// Segments ask their owner (table, vector collection, timeseries chunk)
29/// whether a key is definitely absent before walking their main structure.
30pub trait HasBloom {
31    /// Reference to the bloom filter attached to this segment, if any.
32    fn bloom_segment(&self) -> Option<&BloomSegment>;
33
34    /// Fast-path negative check. Returns `true` iff the bloom is present and
35    /// reports the key as absent.
36    fn definitely_absent(&self, key: &[u8]) -> bool {
37        self.bloom_segment()
38            .map(|b| b.definitely_absent(key))
39            .unwrap_or(false)
40    }
41}
42
43/// Owning bloom header that can be embedded in a segment.
44pub struct BloomSegment {
45    filter: BloomFilter,
46    inserted: u32,
47}
48
49impl std::fmt::Debug for BloomSegment {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_struct("BloomSegment")
52            .field("bit_size", &self.filter.bit_size())
53            .field("num_hashes", &self.filter.num_hashes())
54            .field("inserted", &self.inserted)
55            .finish()
56    }
57}
58
59impl BloomSegment {
60    /// Build a bloom sized for `expected` elements at a 1% false-positive
61    /// rate. Cheap — allocates `~9.6 * expected / 8` bytes.
62    pub fn with_capacity(expected: usize) -> Self {
63        Self {
64            filter: BloomFilter::with_capacity(expected.max(16), 0.01),
65            inserted: 0,
66        }
67    }
68
69    /// Custom false-positive rate.
70    pub fn with_rate(expected: usize, fp_rate: f64) -> Self {
71        Self {
72            filter: BloomFilter::with_capacity(expected.max(16), fp_rate),
73            inserted: 0,
74        }
75    }
76
77    /// Record `key` as possibly present.
78    pub fn insert(&mut self, key: &[u8]) {
79        self.filter.insert(key);
80        self.inserted = self.inserted.saturating_add(1);
81    }
82
83    /// Might `key` be present? (May return a false positive, never a false
84    /// negative.)
85    pub fn contains(&self, key: &[u8]) -> bool {
86        self.filter.contains(key)
87    }
88
89    /// Inverse of `contains` — the thing callers usually want.
90    pub fn definitely_absent(&self, key: &[u8]) -> bool {
91        !self.filter.contains(key)
92    }
93
94    /// Estimated current false-positive rate given the number of insertions.
95    pub fn estimated_fp_rate(&self) -> f64 {
96        self.filter.estimate_fp_rate(self.inserted as usize)
97    }
98
99    /// Number of elements recorded so far (best-effort).
100    pub fn inserted_count(&self) -> u32 {
101        self.inserted
102    }
103
104    /// Access the underlying bloom filter (e.g. to pass to
105    /// [`crate::storage::index::IndexBase::bloom`]).
106    pub fn filter(&self) -> &BloomFilter {
107        &self.filter
108    }
109
110    /// Merge another bloom segment into this one. Both must have the same
111    /// size and hash count. Returns `false` on mismatch.
112    pub fn union_inplace(&mut self, other: &BloomSegment) -> bool {
113        if self.filter.union_inplace(&other.filter) {
114            self.inserted = self.inserted.saturating_add(other.inserted);
115            true
116        } else {
117            false
118        }
119    }
120
121    /// Serialise into the header layout documented at module level.
122    pub fn encode(&self) -> Vec<u8> {
123        reddb_file::encode_bloom_segment_frame(&reddb_file::BloomSegmentFrame {
124            num_hashes: self.filter.num_hashes(),
125            bit_size: self.filter.bit_size(),
126            inserted: self.inserted,
127            bits: self.filter.as_bytes().to_vec(),
128        })
129    }
130
131    /// Parse a previously encoded header. Returns a fresh `BloomSegment` and
132    /// the number of bytes consumed.
133    pub fn decode(bytes: &[u8]) -> Result<(Self, usize), BloomSegmentError> {
134        let (frame, consumed) = reddb_file::decode_bloom_segment_frame(bytes)?;
135        let filter =
136            BloomFilter::from_bytes_with_size(frame.bits, frame.num_hashes, frame.bit_size);
137        Ok((
138            Self {
139                filter,
140                inserted: frame.inserted,
141            },
142            consumed,
143        ))
144    }
145}
146
147/// Fluent builder that mirrors `BloomFilterBuilder` but produces a
148/// `BloomSegment`.
149pub struct BloomSegmentBuilder {
150    expected: usize,
151    fp_rate: f64,
152}
153
154impl BloomSegmentBuilder {
155    pub fn new() -> Self {
156        Self {
157            expected: 1024,
158            fp_rate: 0.01,
159        }
160    }
161
162    pub fn expected(mut self, n: usize) -> Self {
163        self.expected = n;
164        self
165    }
166
167    pub fn false_positive_rate(mut self, rate: f64) -> Self {
168        self.fp_rate = rate;
169        self
170    }
171
172    pub fn build(self) -> BloomSegment {
173        BloomSegment::with_rate(self.expected, self.fp_rate)
174    }
175}
176
177impl Default for BloomSegmentBuilder {
178    fn default() -> Self {
179        Self::new()
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    #[test]
188    fn insert_and_query() {
189        let mut seg = BloomSegment::with_capacity(1024);
190        seg.insert(b"alpha");
191        seg.insert(b"beta");
192
193        assert!(seg.contains(b"alpha"));
194        assert!(seg.contains(b"beta"));
195        assert!(seg.definitely_absent(b"gamma") || seg.contains(b"gamma"));
196        // No false negatives.
197        assert!(!seg.definitely_absent(b"alpha"));
198        assert_eq!(seg.inserted_count(), 2);
199    }
200
201    #[test]
202    fn encode_decode_roundtrip() {
203        let mut seg = BloomSegment::with_capacity(512);
204        for i in 0..100 {
205            seg.insert(format!("key{i}").as_bytes());
206        }
207
208        let bytes = seg.encode();
209        let (restored, consumed) = BloomSegment::decode(&bytes).unwrap();
210        assert_eq!(consumed, bytes.len());
211        assert_eq!(restored.inserted_count(), 100);
212        for i in 0..100 {
213            assert!(restored.contains(format!("key{i}").as_bytes()));
214        }
215    }
216
217    #[test]
218    fn decode_rejects_bad_magic() {
219        let mut bytes = BloomSegment::with_capacity(64).encode();
220        bytes[0] = 0x00;
221        assert_eq!(
222            BloomSegment::decode(&bytes).unwrap_err(),
223            BloomSegmentError::BadMagic
224        );
225    }
226
227    #[test]
228    fn decode_rejects_short_buffer() {
229        let bytes = [reddb_file::BLOOM_SEGMENT_MAGIC, 3, 0, 0];
230        assert_eq!(
231            BloomSegment::decode(&bytes).unwrap_err(),
232            BloomSegmentError::TooShort
233        );
234    }
235
236    #[test]
237    fn decode_rejects_truncated_payload() {
238        let mut bytes = BloomSegment::with_capacity(64).encode();
239        bytes.truncate(bytes.len() - 1);
240        assert_eq!(
241            BloomSegment::decode(&bytes).unwrap_err(),
242            BloomSegmentError::LengthMismatch
243        );
244    }
245
246    #[test]
247    fn union_merges_populations() {
248        let mut a = BloomSegment::with_rate(1024, 0.01);
249        let mut b = BloomSegment::with_rate(1024, 0.01);
250        a.insert(b"one");
251        b.insert(b"two");
252        assert!(a.union_inplace(&b));
253        assert!(a.contains(b"one"));
254        assert!(a.contains(b"two"));
255        assert_eq!(a.inserted_count(), 2);
256    }
257
258    #[test]
259    fn union_rejects_incompatible() {
260        let mut a = BloomSegment::with_rate(1024, 0.01);
261        let b = BloomSegment::with_rate(4096, 0.01);
262        assert!(!a.union_inplace(&b));
263    }
264
265    #[test]
266    fn has_bloom_default_absent_when_none() {
267        struct NoBloom;
268        impl HasBloom for NoBloom {
269            fn bloom_segment(&self) -> Option<&BloomSegment> {
270                None
271            }
272        }
273        let x = NoBloom;
274        assert!(!x.definitely_absent(b"anything"));
275    }
276}