Skip to main content

reddb_server/storage/index/
bloom_segment.rs

1//! Reusable bloom filter header that any segment can embed.
2//!
3//! Segments (table pages, vector segments, timeseries chunks, graph
4//! partitions) all benefit from the same question: "is key X *possibly* in
5//! this segment?". Rather than reimplementing bloom wiring in every storage
6//! engine, wrap [`crate::storage::primitives::BloomFilter`] here with a
7//! serialisation header and a small trait `HasBloom` for owners to plug in.
8//!
9//! Layout on disk / inside a segment header is:
10//!
11//! ```text
12//! [ u8  magic = 0xBF ]
13//! [ u8  num_hashes   ]
14//! [ u32 bit_size     ]   // big-endian
15//! [ u32 inserted     ]   // monotonic counter, best-effort
16//! [ bytes...          ]   // bit array
17//! ```
18//!
19//! Readers that don't care about bloom can skip `4 + (bit_size + 7) / 8`
20//! bytes after the 10-byte header.
21
22use crate::storage::primitives::BloomFilter;
23
24const BLOOM_SEGMENT_MAGIC: u8 = 0xBF;
25const HEADER_LEN: usize = 1 + 1 + 4 + 4; // magic + hashes + bit_size + inserted
26
27/// Error returned when parsing a bloom segment header.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum BloomSegmentError {
30    /// Byte slice was shorter than the fixed header.
31    TooShort,
32    /// Magic byte did not match [`BLOOM_SEGMENT_MAGIC`].
33    BadMagic,
34    /// Declared bit size did not match payload length.
35    LengthMismatch,
36}
37
38impl std::fmt::Display for BloomSegmentError {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            BloomSegmentError::TooShort => write!(f, "bloom header too short"),
42            BloomSegmentError::BadMagic => write!(f, "bloom header magic mismatch"),
43            BloomSegmentError::LengthMismatch => write!(f, "bloom header length mismatch"),
44        }
45    }
46}
47
48impl std::error::Error for BloomSegmentError {}
49
50/// Trait implemented by owners of a segment-level bloom filter.
51///
52/// Segments ask their owner (table, vector collection, timeseries chunk)
53/// whether a key is definitely absent before walking their main structure.
54pub trait HasBloom {
55    /// Reference to the bloom filter attached to this segment, if any.
56    fn bloom_segment(&self) -> Option<&BloomSegment>;
57
58    /// Fast-path negative check. Returns `true` iff the bloom is present and
59    /// reports the key as absent.
60    fn definitely_absent(&self, key: &[u8]) -> bool {
61        self.bloom_segment()
62            .map(|b| b.definitely_absent(key))
63            .unwrap_or(false)
64    }
65}
66
67/// Owning bloom header that can be embedded in a segment.
68pub struct BloomSegment {
69    filter: BloomFilter,
70    inserted: u32,
71}
72
73impl std::fmt::Debug for BloomSegment {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("BloomSegment")
76            .field("bit_size", &self.filter.bit_size())
77            .field("num_hashes", &self.filter.num_hashes())
78            .field("inserted", &self.inserted)
79            .finish()
80    }
81}
82
83impl BloomSegment {
84    /// Build a bloom sized for `expected` elements at a 1% false-positive
85    /// rate. Cheap — allocates `~9.6 * expected / 8` bytes.
86    pub fn with_capacity(expected: usize) -> Self {
87        Self {
88            filter: BloomFilter::with_capacity(expected.max(16), 0.01),
89            inserted: 0,
90        }
91    }
92
93    /// Custom false-positive rate.
94    pub fn with_rate(expected: usize, fp_rate: f64) -> Self {
95        Self {
96            filter: BloomFilter::with_capacity(expected.max(16), fp_rate),
97            inserted: 0,
98        }
99    }
100
101    /// Record `key` as possibly present.
102    pub fn insert(&mut self, key: &[u8]) {
103        self.filter.insert(key);
104        self.inserted = self.inserted.saturating_add(1);
105    }
106
107    /// Might `key` be present? (May return a false positive, never a false
108    /// negative.)
109    pub fn contains(&self, key: &[u8]) -> bool {
110        self.filter.contains(key)
111    }
112
113    /// Inverse of `contains` — the thing callers usually want.
114    pub fn definitely_absent(&self, key: &[u8]) -> bool {
115        !self.filter.contains(key)
116    }
117
118    /// Estimated current false-positive rate given the number of insertions.
119    pub fn estimated_fp_rate(&self) -> f64 {
120        self.filter.estimate_fp_rate(self.inserted as usize)
121    }
122
123    /// Number of elements recorded so far (best-effort).
124    pub fn inserted_count(&self) -> u32 {
125        self.inserted
126    }
127
128    /// Access the underlying bloom filter (e.g. to pass to
129    /// [`crate::storage::index::IndexBase::bloom`]).
130    pub fn filter(&self) -> &BloomFilter {
131        &self.filter
132    }
133
134    /// Merge another bloom segment into this one. Both must have the same
135    /// size and hash count. Returns `false` on mismatch.
136    pub fn union_inplace(&mut self, other: &BloomSegment) -> bool {
137        if self.filter.union_inplace(&other.filter) {
138            self.inserted = self.inserted.saturating_add(other.inserted);
139            true
140        } else {
141            false
142        }
143    }
144
145    /// Serialise into the header layout documented at module level.
146    pub fn encode(&self) -> Vec<u8> {
147        let bits = self.filter.as_bytes();
148        let bit_size = self.filter.bit_size();
149        let mut out = Vec::with_capacity(HEADER_LEN + bits.len());
150        out.push(BLOOM_SEGMENT_MAGIC);
151        out.push(self.filter.num_hashes());
152        out.extend_from_slice(&bit_size.to_be_bytes());
153        out.extend_from_slice(&self.inserted.to_be_bytes());
154        out.extend_from_slice(bits);
155        out
156    }
157
158    /// Parse a previously encoded header. Returns a fresh `BloomSegment` and
159    /// the number of bytes consumed.
160    pub fn decode(bytes: &[u8]) -> Result<(Self, usize), BloomSegmentError> {
161        if bytes.len() < HEADER_LEN {
162            return Err(BloomSegmentError::TooShort);
163        }
164        if bytes[0] != BLOOM_SEGMENT_MAGIC {
165            return Err(BloomSegmentError::BadMagic);
166        }
167        let num_hashes = bytes[1];
168        let bit_size = u32::from_be_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
169        let inserted = u32::from_be_bytes([bytes[6], bytes[7], bytes[8], bytes[9]]);
170        let byte_len = (bit_size as usize).div_ceil(8);
171        let total = HEADER_LEN + byte_len;
172        if bytes.len() < total {
173            return Err(BloomSegmentError::LengthMismatch);
174        }
175        let filter = BloomFilter::from_bytes_with_size(
176            bytes[HEADER_LEN..total].to_vec(),
177            num_hashes,
178            bit_size,
179        );
180        Ok((Self { filter, inserted }, total))
181    }
182}
183
184/// Fluent builder that mirrors `BloomFilterBuilder` but produces a
185/// `BloomSegment`.
186pub struct BloomSegmentBuilder {
187    expected: usize,
188    fp_rate: f64,
189}
190
191impl BloomSegmentBuilder {
192    pub fn new() -> Self {
193        Self {
194            expected: 1024,
195            fp_rate: 0.01,
196        }
197    }
198
199    pub fn expected(mut self, n: usize) -> Self {
200        self.expected = n;
201        self
202    }
203
204    pub fn false_positive_rate(mut self, rate: f64) -> Self {
205        self.fp_rate = rate;
206        self
207    }
208
209    pub fn build(self) -> BloomSegment {
210        BloomSegment::with_rate(self.expected, self.fp_rate)
211    }
212}
213
214impl Default for BloomSegmentBuilder {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn insert_and_query() {
226        let mut seg = BloomSegment::with_capacity(1024);
227        seg.insert(b"alpha");
228        seg.insert(b"beta");
229
230        assert!(seg.contains(b"alpha"));
231        assert!(seg.contains(b"beta"));
232        assert!(seg.definitely_absent(b"gamma") || seg.contains(b"gamma"));
233        // No false negatives.
234        assert!(!seg.definitely_absent(b"alpha"));
235        assert_eq!(seg.inserted_count(), 2);
236    }
237
238    #[test]
239    fn encode_decode_roundtrip() {
240        let mut seg = BloomSegment::with_capacity(512);
241        for i in 0..100 {
242            seg.insert(format!("key{i}").as_bytes());
243        }
244
245        let bytes = seg.encode();
246        let (restored, consumed) = BloomSegment::decode(&bytes).unwrap();
247        assert_eq!(consumed, bytes.len());
248        assert_eq!(restored.inserted_count(), 100);
249        for i in 0..100 {
250            assert!(restored.contains(format!("key{i}").as_bytes()));
251        }
252    }
253
254    #[test]
255    fn decode_rejects_bad_magic() {
256        let mut bytes = BloomSegment::with_capacity(64).encode();
257        bytes[0] = 0x00;
258        assert_eq!(
259            BloomSegment::decode(&bytes).unwrap_err(),
260            BloomSegmentError::BadMagic
261        );
262    }
263
264    #[test]
265    fn decode_rejects_short_buffer() {
266        let bytes = [0xBF, 3, 0, 0];
267        assert_eq!(
268            BloomSegment::decode(&bytes).unwrap_err(),
269            BloomSegmentError::TooShort
270        );
271    }
272
273    #[test]
274    fn decode_rejects_truncated_payload() {
275        let mut bytes = BloomSegment::with_capacity(64).encode();
276        bytes.truncate(bytes.len() - 1);
277        assert_eq!(
278            BloomSegment::decode(&bytes).unwrap_err(),
279            BloomSegmentError::LengthMismatch
280        );
281    }
282
283    #[test]
284    fn union_merges_populations() {
285        let mut a = BloomSegment::with_rate(1024, 0.01);
286        let mut b = BloomSegment::with_rate(1024, 0.01);
287        a.insert(b"one");
288        b.insert(b"two");
289        assert!(a.union_inplace(&b));
290        assert!(a.contains(b"one"));
291        assert!(a.contains(b"two"));
292        assert_eq!(a.inserted_count(), 2);
293    }
294
295    #[test]
296    fn union_rejects_incompatible() {
297        let mut a = BloomSegment::with_rate(1024, 0.01);
298        let b = BloomSegment::with_rate(4096, 0.01);
299        assert!(!a.union_inplace(&b));
300    }
301
302    #[test]
303    fn has_bloom_default_absent_when_none() {
304        struct NoBloom;
305        impl HasBloom for NoBloom {
306            fn bloom_segment(&self) -> Option<&BloomSegment> {
307                None
308            }
309        }
310        let x = NoBloom;
311        assert!(!x.definitely_absent(b"anything"));
312    }
313}