reddb_server/storage/index/
bloom_segment.rs1use crate::storage::primitives::BloomFilter;
23
24pub use reddb_file::BloomSegmentFrameError as BloomSegmentError;
25
26pub trait HasBloom {
31 fn bloom_segment(&self) -> Option<&BloomSegment>;
33
34 fn definitely_absent(&self, key: &[u8]) -> bool {
37 self.bloom_segment()
38 .map(|b| b.definitely_absent(key))
39 .unwrap_or(false)
40 }
41}
42
43pub struct BloomSegment {
45 filter: BloomFilter,
46 inserted: u32,
47}
48
49impl std::fmt::Debug for BloomSegment {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("BloomSegment")
52 .field("bit_size", &self.filter.bit_size())
53 .field("num_hashes", &self.filter.num_hashes())
54 .field("inserted", &self.inserted)
55 .finish()
56 }
57}
58
59impl BloomSegment {
60 pub fn with_capacity(expected: usize) -> Self {
63 Self {
64 filter: BloomFilter::with_capacity(expected.max(16), 0.01),
65 inserted: 0,
66 }
67 }
68
69 pub fn with_rate(expected: usize, fp_rate: f64) -> Self {
71 Self {
72 filter: BloomFilter::with_capacity(expected.max(16), fp_rate),
73 inserted: 0,
74 }
75 }
76
77 pub fn insert(&mut self, key: &[u8]) {
79 self.filter.insert(key);
80 self.inserted = self.inserted.saturating_add(1);
81 }
82
83 pub fn contains(&self, key: &[u8]) -> bool {
86 self.filter.contains(key)
87 }
88
89 pub fn definitely_absent(&self, key: &[u8]) -> bool {
91 !self.filter.contains(key)
92 }
93
94 pub fn estimated_fp_rate(&self) -> f64 {
96 self.filter.estimate_fp_rate(self.inserted as usize)
97 }
98
99 pub fn inserted_count(&self) -> u32 {
101 self.inserted
102 }
103
104 pub fn filter(&self) -> &BloomFilter {
107 &self.filter
108 }
109
110 pub fn union_inplace(&mut self, other: &BloomSegment) -> bool {
113 if self.filter.union_inplace(&other.filter) {
114 self.inserted = self.inserted.saturating_add(other.inserted);
115 true
116 } else {
117 false
118 }
119 }
120
121 pub fn encode(&self) -> Vec<u8> {
123 reddb_file::encode_bloom_segment_frame(&reddb_file::BloomSegmentFrame {
124 num_hashes: self.filter.num_hashes(),
125 bit_size: self.filter.bit_size(),
126 inserted: self.inserted,
127 bits: self.filter.as_bytes().to_vec(),
128 })
129 }
130
131 pub fn decode(bytes: &[u8]) -> Result<(Self, usize), BloomSegmentError> {
134 let (frame, consumed) = reddb_file::decode_bloom_segment_frame(bytes)?;
135 let filter =
136 BloomFilter::from_bytes_with_size(frame.bits, frame.num_hashes, frame.bit_size);
137 Ok((
138 Self {
139 filter,
140 inserted: frame.inserted,
141 },
142 consumed,
143 ))
144 }
145}
146
147pub struct BloomSegmentBuilder {
150 expected: usize,
151 fp_rate: f64,
152}
153
154impl BloomSegmentBuilder {
155 pub fn new() -> Self {
156 Self {
157 expected: 1024,
158 fp_rate: 0.01,
159 }
160 }
161
162 pub fn expected(mut self, n: usize) -> Self {
163 self.expected = n;
164 self
165 }
166
167 pub fn false_positive_rate(mut self, rate: f64) -> Self {
168 self.fp_rate = rate;
169 self
170 }
171
172 pub fn build(self) -> BloomSegment {
173 BloomSegment::with_rate(self.expected, self.fp_rate)
174 }
175}
176
177impl Default for BloomSegmentBuilder {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186
187 #[test]
188 fn insert_and_query() {
189 let mut seg = BloomSegment::with_capacity(1024);
190 seg.insert(b"alpha");
191 seg.insert(b"beta");
192
193 assert!(seg.contains(b"alpha"));
194 assert!(seg.contains(b"beta"));
195 assert!(seg.definitely_absent(b"gamma") || seg.contains(b"gamma"));
196 assert!(!seg.definitely_absent(b"alpha"));
198 assert_eq!(seg.inserted_count(), 2);
199 }
200
201 #[test]
202 fn encode_decode_roundtrip() {
203 let mut seg = BloomSegment::with_capacity(512);
204 for i in 0..100 {
205 seg.insert(format!("key{i}").as_bytes());
206 }
207
208 let bytes = seg.encode();
209 let (restored, consumed) = BloomSegment::decode(&bytes).unwrap();
210 assert_eq!(consumed, bytes.len());
211 assert_eq!(restored.inserted_count(), 100);
212 for i in 0..100 {
213 assert!(restored.contains(format!("key{i}").as_bytes()));
214 }
215 }
216
217 #[test]
218 fn decode_rejects_bad_magic() {
219 let mut bytes = BloomSegment::with_capacity(64).encode();
220 bytes[0] = 0x00;
221 assert_eq!(
222 BloomSegment::decode(&bytes).unwrap_err(),
223 BloomSegmentError::BadMagic
224 );
225 }
226
227 #[test]
228 fn decode_rejects_short_buffer() {
229 let bytes = [reddb_file::BLOOM_SEGMENT_MAGIC, 3, 0, 0];
230 assert_eq!(
231 BloomSegment::decode(&bytes).unwrap_err(),
232 BloomSegmentError::TooShort
233 );
234 }
235
236 #[test]
237 fn decode_rejects_truncated_payload() {
238 let mut bytes = BloomSegment::with_capacity(64).encode();
239 bytes.truncate(bytes.len() - 1);
240 assert_eq!(
241 BloomSegment::decode(&bytes).unwrap_err(),
242 BloomSegmentError::LengthMismatch
243 );
244 }
245
246 #[test]
247 fn union_merges_populations() {
248 let mut a = BloomSegment::with_rate(1024, 0.01);
249 let mut b = BloomSegment::with_rate(1024, 0.01);
250 a.insert(b"one");
251 b.insert(b"two");
252 assert!(a.union_inplace(&b));
253 assert!(a.contains(b"one"));
254 assert!(a.contains(b"two"));
255 assert_eq!(a.inserted_count(), 2);
256 }
257
258 #[test]
259 fn union_rejects_incompatible() {
260 let mut a = BloomSegment::with_rate(1024, 0.01);
261 let b = BloomSegment::with_rate(4096, 0.01);
262 assert!(!a.union_inplace(&b));
263 }
264
265 #[test]
266 fn has_bloom_default_absent_when_none() {
267 struct NoBloom;
268 impl HasBloom for NoBloom {
269 fn bloom_segment(&self) -> Option<&BloomSegment> {
270 None
271 }
272 }
273 let x = NoBloom;
274 assert!(!x.definitely_absent(b"anything"));
275 }
276}