reddb_server/storage/index/
bloom_segment.rs1use crate::storage::primitives::BloomFilter;
23
24const BLOOM_SEGMENT_MAGIC: u8 = 0xBF;
25const HEADER_LEN: usize = 1 + 1 + 4 + 4; #[derive(Debug, Clone, PartialEq, Eq)]
29pub enum BloomSegmentError {
30 TooShort,
32 BadMagic,
34 LengthMismatch,
36}
37
38impl std::fmt::Display for BloomSegmentError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 BloomSegmentError::TooShort => write!(f, "bloom header too short"),
42 BloomSegmentError::BadMagic => write!(f, "bloom header magic mismatch"),
43 BloomSegmentError::LengthMismatch => write!(f, "bloom header length mismatch"),
44 }
45 }
46}
47
48impl std::error::Error for BloomSegmentError {}
49
50pub trait HasBloom {
55 fn bloom_segment(&self) -> Option<&BloomSegment>;
57
58 fn definitely_absent(&self, key: &[u8]) -> bool {
61 self.bloom_segment()
62 .map(|b| b.definitely_absent(key))
63 .unwrap_or(false)
64 }
65}
66
67pub struct BloomSegment {
69 filter: BloomFilter,
70 inserted: u32,
71}
72
73impl std::fmt::Debug for BloomSegment {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("BloomSegment")
76 .field("bit_size", &self.filter.bit_size())
77 .field("num_hashes", &self.filter.num_hashes())
78 .field("inserted", &self.inserted)
79 .finish()
80 }
81}
82
83impl BloomSegment {
84 pub fn with_capacity(expected: usize) -> Self {
87 Self {
88 filter: BloomFilter::with_capacity(expected.max(16), 0.01),
89 inserted: 0,
90 }
91 }
92
93 pub fn with_rate(expected: usize, fp_rate: f64) -> Self {
95 Self {
96 filter: BloomFilter::with_capacity(expected.max(16), fp_rate),
97 inserted: 0,
98 }
99 }
100
101 pub fn insert(&mut self, key: &[u8]) {
103 self.filter.insert(key);
104 self.inserted = self.inserted.saturating_add(1);
105 }
106
107 pub fn contains(&self, key: &[u8]) -> bool {
110 self.filter.contains(key)
111 }
112
113 pub fn definitely_absent(&self, key: &[u8]) -> bool {
115 !self.filter.contains(key)
116 }
117
118 pub fn estimated_fp_rate(&self) -> f64 {
120 self.filter.estimate_fp_rate(self.inserted as usize)
121 }
122
123 pub fn inserted_count(&self) -> u32 {
125 self.inserted
126 }
127
128 pub fn filter(&self) -> &BloomFilter {
131 &self.filter
132 }
133
134 pub fn union_inplace(&mut self, other: &BloomSegment) -> bool {
137 if self.filter.union_inplace(&other.filter) {
138 self.inserted = self.inserted.saturating_add(other.inserted);
139 true
140 } else {
141 false
142 }
143 }
144
145 pub fn encode(&self) -> Vec<u8> {
147 let bits = self.filter.as_bytes();
148 let bit_size = self.filter.bit_size();
149 let mut out = Vec::with_capacity(HEADER_LEN + bits.len());
150 out.push(BLOOM_SEGMENT_MAGIC);
151 out.push(self.filter.num_hashes());
152 out.extend_from_slice(&bit_size.to_be_bytes());
153 out.extend_from_slice(&self.inserted.to_be_bytes());
154 out.extend_from_slice(bits);
155 out
156 }
157
158 pub fn decode(bytes: &[u8]) -> Result<(Self, usize), BloomSegmentError> {
161 if bytes.len() < HEADER_LEN {
162 return Err(BloomSegmentError::TooShort);
163 }
164 if bytes[0] != BLOOM_SEGMENT_MAGIC {
165 return Err(BloomSegmentError::BadMagic);
166 }
167 let num_hashes = bytes[1];
168 let bit_size = u32::from_be_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
169 let inserted = u32::from_be_bytes([bytes[6], bytes[7], bytes[8], bytes[9]]);
170 let byte_len = (bit_size as usize).div_ceil(8);
171 let total = HEADER_LEN + byte_len;
172 if bytes.len() < total {
173 return Err(BloomSegmentError::LengthMismatch);
174 }
175 let filter = BloomFilter::from_bytes_with_size(
176 bytes[HEADER_LEN..total].to_vec(),
177 num_hashes,
178 bit_size,
179 );
180 Ok((Self { filter, inserted }, total))
181 }
182}
183
184pub struct BloomSegmentBuilder {
187 expected: usize,
188 fp_rate: f64,
189}
190
191impl BloomSegmentBuilder {
192 pub fn new() -> Self {
193 Self {
194 expected: 1024,
195 fp_rate: 0.01,
196 }
197 }
198
199 pub fn expected(mut self, n: usize) -> Self {
200 self.expected = n;
201 self
202 }
203
204 pub fn false_positive_rate(mut self, rate: f64) -> Self {
205 self.fp_rate = rate;
206 self
207 }
208
209 pub fn build(self) -> BloomSegment {
210 BloomSegment::with_rate(self.expected, self.fp_rate)
211 }
212}
213
214impl Default for BloomSegmentBuilder {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn insert_and_query() {
226 let mut seg = BloomSegment::with_capacity(1024);
227 seg.insert(b"alpha");
228 seg.insert(b"beta");
229
230 assert!(seg.contains(b"alpha"));
231 assert!(seg.contains(b"beta"));
232 assert!(seg.definitely_absent(b"gamma") || seg.contains(b"gamma"));
233 assert!(!seg.definitely_absent(b"alpha"));
235 assert_eq!(seg.inserted_count(), 2);
236 }
237
238 #[test]
239 fn encode_decode_roundtrip() {
240 let mut seg = BloomSegment::with_capacity(512);
241 for i in 0..100 {
242 seg.insert(format!("key{i}").as_bytes());
243 }
244
245 let bytes = seg.encode();
246 let (restored, consumed) = BloomSegment::decode(&bytes).unwrap();
247 assert_eq!(consumed, bytes.len());
248 assert_eq!(restored.inserted_count(), 100);
249 for i in 0..100 {
250 assert!(restored.contains(format!("key{i}").as_bytes()));
251 }
252 }
253
254 #[test]
255 fn decode_rejects_bad_magic() {
256 let mut bytes = BloomSegment::with_capacity(64).encode();
257 bytes[0] = 0x00;
258 assert_eq!(
259 BloomSegment::decode(&bytes).unwrap_err(),
260 BloomSegmentError::BadMagic
261 );
262 }
263
264 #[test]
265 fn decode_rejects_short_buffer() {
266 let bytes = [0xBF, 3, 0, 0];
267 assert_eq!(
268 BloomSegment::decode(&bytes).unwrap_err(),
269 BloomSegmentError::TooShort
270 );
271 }
272
273 #[test]
274 fn decode_rejects_truncated_payload() {
275 let mut bytes = BloomSegment::with_capacity(64).encode();
276 bytes.truncate(bytes.len() - 1);
277 assert_eq!(
278 BloomSegment::decode(&bytes).unwrap_err(),
279 BloomSegmentError::LengthMismatch
280 );
281 }
282
283 #[test]
284 fn union_merges_populations() {
285 let mut a = BloomSegment::with_rate(1024, 0.01);
286 let mut b = BloomSegment::with_rate(1024, 0.01);
287 a.insert(b"one");
288 b.insert(b"two");
289 assert!(a.union_inplace(&b));
290 assert!(a.contains(b"one"));
291 assert!(a.contains(b"two"));
292 assert_eq!(a.inserted_count(), 2);
293 }
294
295 #[test]
296 fn union_rejects_incompatible() {
297 let mut a = BloomSegment::with_rate(1024, 0.01);
298 let b = BloomSegment::with_rate(4096, 0.01);
299 assert!(!a.union_inplace(&b));
300 }
301
302 #[test]
303 fn has_bloom_default_absent_when_none() {
304 struct NoBloom;
305 impl HasBloom for NoBloom {
306 fn bloom_segment(&self) -> Option<&BloomSegment> {
307 None
308 }
309 }
310 let x = NoBloom;
311 assert!(!x.definitely_absent(b"anything"));
312 }
313}