expandable_cuckoo_filter/
lib.rs1use byteorder::{ByteOrder, LittleEndian};
20use cuckoofilter::{CuckooFilter, ExportedCuckooFilter};
21use parking_lot::RwLock;
22use std::collections::hash_map::DefaultHasher;
23use std::hash::{Hash, Hasher};
24use std::sync::Arc;
25use thiserror::Error;
26
27#[derive(Error, Debug)]
28pub enum CuckooError {
29 #[error("Invalid data format")]
30 InvalidData,
31}
32
33struct FilterNode {
34 filter: CuckooFilter<DefaultHasher>,
35 capacity: usize,
36}
37
38#[derive(Clone)]
43pub struct ExpandableCuckooFilter {
44 node_id: String,
45 seed: u64,
46 filters: Arc<RwLock<Vec<FilterNode>>>,
47 initial_capacity: usize,
48}
49
50impl ExpandableCuckooFilter {
51 pub fn new(node_id: impl Into<String>, initial_capacity: usize) -> Self {
62 let node_id = node_id.into();
63
64 let mut hasher = DefaultHasher::new();
65 node_id.hash(&mut hasher);
66 let seed = hasher.finish();
67
68 let filter = CuckooFilter::with_capacity(initial_capacity);
69
70 Self {
71 node_id,
72 seed,
73 filters: Arc::new(RwLock::new(vec![FilterNode {
74 filter,
75 capacity: initial_capacity,
76 }])),
77 initial_capacity,
78 }
79 }
80
81 pub fn node_id(&self) -> &str {
83 &self.node_id
84 }
85
86 pub fn seed(&self) -> u64 {
88 self.seed
89 }
90
91 fn hash_key<T: Hash + ?Sized>(&self, item: &T) -> (u64, u64) {
104 let mut hasher = DefaultHasher::new();
105 item.hash(&mut hasher);
106 (self.seed, hasher.finish())
107 }
108
109 pub fn insert<T: Hash + ?Sized>(&self, item: &T) -> bool {
116 let hashed_keys = self.hash_key(item);
117
118 let mut filters = self.filters.write();
119 let last_idx = filters.len() - 1;
120
121 let current_node = &filters[last_idx];
123 let load_factor = current_node.filter.len() as f64 / current_node.capacity as f64;
124
125 if load_factor > 0.80 {
128 self.expand(&mut filters);
130 let last_idx = filters.len() - 1;
132 let _ = filters[last_idx].filter.add(&hashed_keys);
134 return true;
135 }
136
137 let result = filters[last_idx].filter.add(&hashed_keys);
138
139 match result {
140 Ok(_) => true,
141 Err(_) => {
142 let last_capacity = filters[last_idx].capacity;
149 let new_capacity = last_capacity * 2;
150 let mut new_filter = CuckooFilter::with_capacity(new_capacity);
151
152 if let Err(e) = new_filter.add(&hashed_keys) {
153 eprintln!(
154 "CRITICAL: Failed to insert into NEW filter (cap: {}). Error: {:?}",
155 new_capacity, e
156 );
157 return false;
158 }
159
160 filters.push(FilterNode {
161 filter: new_filter,
162 capacity: new_capacity,
163 });
164 true
165 }
166 }
167 }
168
169 fn expand(&self, filters: &mut Vec<FilterNode>) {
170 let last_capacity = filters
171 .last()
172 .map(|n| n.capacity)
173 .unwrap_or(self.initial_capacity);
174 let new_capacity = last_capacity * 2;
175 let new_filter = CuckooFilter::with_capacity(new_capacity);
176 filters.push(FilterNode {
177 filter: new_filter,
178 capacity: new_capacity,
179 });
180 }
181
182 pub fn contains<T: Hash + ?Sized>(&self, item: &T) -> bool {
188 let hashed_keys = self.hash_key(item);
189 let filters = self.filters.read();
190
191 for node in filters.iter().rev() {
195 if node.filter.contains(&hashed_keys) {
196 return true;
197 }
198 }
199 false
200 }
201
202 pub fn remove<T: Hash + ?Sized>(&self, item: &T) -> bool {
207 let hashed_keys = self.hash_key(item);
208 let mut filters = self.filters.write();
209
210 for node in filters.iter_mut().rev() {
214 if node.filter.contains(&hashed_keys) && node.filter.delete(&hashed_keys) {
215 return true;
216 }
217 }
218 false
219 }
220
221 pub fn len(&self) -> usize {
223 let filters = self.filters.read();
224 filters.iter().map(|f| f.filter.len()).sum()
225 }
226
227 pub fn is_empty(&self) -> bool {
229 self.len() == 0
230 }
231
232 pub fn capacity(&self) -> usize {
234 let filters = self.filters.read();
235 filters.iter().map(|f| f.capacity).sum()
236 }
237
238 const MAGIC_BYTES: &[u8; 4] = b"ECF1";
239
240 pub fn export(&self) -> Result<Vec<u8>, CuckooError> {
244 let filters = self.filters.read();
245 let mut buffer = Vec::new();
246
247 buffer.extend_from_slice(Self::MAGIC_BYTES);
249
250 let mut u64_buf = [0u8; 8];
251 LittleEndian::write_u64(&mut u64_buf, filters.len() as u64);
252 buffer.extend_from_slice(&u64_buf);
253
254 for node in filters.iter() {
255 let exported = node.filter.export();
256
257 LittleEndian::write_u64(&mut u64_buf, node.capacity as u64);
258 buffer.extend_from_slice(&u64_buf);
259
260 LittleEndian::write_u64(&mut u64_buf, exported.length as u64);
261 buffer.extend_from_slice(&u64_buf);
262
263 LittleEndian::write_u64(&mut u64_buf, exported.values.len() as u64);
264 buffer.extend_from_slice(&u64_buf);
265
266 buffer.extend_from_slice(&exported.values);
267 }
268
269 Ok(buffer)
270 }
271
272 pub fn import(&self, data: &[u8]) -> Result<(), CuckooError> {
277 let mut cursor = 0;
278
279 if data.len() < 4 || &data[0..4] != Self::MAGIC_BYTES {
281 return Err(CuckooError::InvalidData);
282 }
283 cursor += 4;
284
285 if cursor + 8 > data.len() {
286 return Err(CuckooError::InvalidData);
287 }
288 let count = LittleEndian::read_u64(&data[cursor..cursor + 8]) as usize;
289 cursor += 8;
290
291 let mut new_filters = Vec::with_capacity(count);
292
293 for _i in 0..count {
294 if cursor + 8 > data.len() {
295 return Err(CuckooError::InvalidData);
296 }
297 let capacity = LittleEndian::read_u64(&data[cursor..cursor + 8]) as usize;
298 cursor += 8;
299
300 if cursor + 8 > data.len() {
301 return Err(CuckooError::InvalidData);
302 }
303 let item_count = LittleEndian::read_u64(&data[cursor..cursor + 8]) as usize;
304 cursor += 8;
305
306 if cursor + 8 > data.len() {
307 return Err(CuckooError::InvalidData);
308 }
309 let values_len = LittleEndian::read_u64(&data[cursor..cursor + 8]) as usize;
310 cursor += 8;
311
312 if cursor + values_len > data.len() {
313 eprintln!(
314 "IMPORT: Invalid values_len {} vs rem {}",
315 values_len,
316 data.len() - cursor
317 );
318 return Err(CuckooError::InvalidData);
319 }
320 let values = data[cursor..cursor + values_len].to_vec();
321 cursor += values_len;
322
323 let exported = ExportedCuckooFilter {
324 length: item_count,
325 values,
326 };
327
328 new_filters.push(FilterNode {
329 filter: CuckooFilter::from(exported),
330 capacity,
331 });
332 }
333
334 if new_filters.is_empty() {
335 new_filters.push(FilterNode {
336 filter: CuckooFilter::with_capacity(self.initial_capacity),
337 capacity: self.initial_capacity,
338 });
339 }
340
341 *self.filters.write() = new_filters;
342
343 Ok(())
344 }
345}
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 #[test]
351 fn test_basic_lifecycle() {
352 let filter = ExpandableCuckooFilter::new("test-basic", 100);
353 assert!(filter.is_empty());
354
355 filter.insert("item1");
356 filter.insert("item2");
357
358 assert!(filter.contains("item1"));
359 assert!(filter.contains("item2"));
360 assert!(!filter.contains("item3"));
361 assert_eq!(filter.len(), 2);
362
363 filter.remove("item1");
364 assert!(!filter.contains("item1"));
365 assert!(filter.contains("item2"));
366 assert_eq!(filter.len(), 1);
367 }
368
369 #[test]
370 fn test_orthogonality() {
371 let f1 = ExpandableCuckooFilter::new("node-A", 1000);
372 let f2 = ExpandableCuckooFilter::new("node-B", 1000);
373
374 assert_ne!(f1.seed(), f2.seed());
375 }
376
377 #[test]
378 fn test_expansion() {
379 let initial_cap = 4;
381 let filter = ExpandableCuckooFilter::new("test-expand", initial_cap);
382
383 for i in 0..100 {
385 filter.insert(&format!("item-{}", i));
386 }
387
388 for i in 0..100 {
390 let key = format!("item-{}", i);
391 assert!(filter.contains(&key), "Missing key {}", key);
392 }
393
394 assert!(filter.capacity() > initial_cap);
395 assert_eq!(filter.len(), 100);
396 }
397
398 #[test]
399 fn test_expansion_remove() {
400 let filter = ExpandableCuckooFilter::new("test-expand-remove", 16);
404
405 for i in 0..100 {
406 filter.insert(&format!("val-{}", i));
407 }
408
409 let key0 = "val-0";
410 assert!(filter.contains(key0));
411 assert!(filter.remove(key0));
412 assert!(!filter.contains(key0));
413
414 let key99 = "val-99";
415 assert!(filter.contains(key99));
416 assert!(filter.remove(key99));
417 assert!(!filter.contains(key99));
418 }
419
420 #[test]
421 fn test_export_import() {
422 let filter = ExpandableCuckooFilter::new("test-persist", 100);
423 for i in 0..20 {
424 filter.insert(&i.to_string());
425 }
426
427 let restored = ExpandableCuckooFilter::new("test-persist", 100);
428 let bytes = filter.export().expect("Export failed");
429 restored.import(&bytes).expect("Import failed");
430
431 assert_eq!(restored.len(), 20);
432 for i in 0..20 {
433 assert!(restored.contains(&i.to_string()));
434 }
435 }
436
437 #[test]
438 fn test_import_invalid_data() {
439 let filter = ExpandableCuckooFilter::new("test-bad", 100);
440
441 let result = filter.import(&[1, 2, 3, 4]);
443 assert!(result.is_err(), "Should fail on garbage logic (Bad Magic)");
444
445 let _payload = vec![0u8; 100];
447 assert!(filter.import(&_payload).is_err());
448 }
449
450 #[test]
451 fn test_coverage_gap_fillers() {
452 let filter = ExpandableCuckooFilter::new("gap-fill", 100);
453 assert_eq!(filter.node_id(), "gap-fill");
454 assert!(filter.seed() > 0);
455 assert_eq!(filter.capacity(), 100);
456 assert!(filter.is_empty());
457
458 let bytes = filter.export().unwrap();
460 for i in 0..bytes.len() - 1 {
461 assert!(
462 filter.import(&bytes[..i]).is_err(),
463 "Truncated at {} should fail",
464 i
465 );
466 }
467
468 let mut empty_data = Vec::new();
470 empty_data.extend_from_slice(b"ECF1");
471 empty_data.extend_from_slice(&0u64.to_le_bytes()); filter
473 .import(&empty_data)
474 .expect("Should handle empty filter count by re-initializing");
475 assert_eq!(filter.len(), 0);
476 assert_eq!(filter.capacity(), 100);
477 }
478
479 mod proptests {
480 use super::*;
481 use proptest::prelude::*;
482 use std::collections::HashSet;
483
484 proptest! {
485 #[test]
486 fn test_fuzz_insert_contains(keys in proptest::collection::vec(".*", 1..100)) {
487 let filter = ExpandableCuckooFilter::new("fuzz-test", 64);
488 for key in &keys {
489 prop_assert!(filter.insert(key), "Insert failed check for key: {:?}", key);
490 }
491 for key in &keys {
492 prop_assert!(filter.contains(key));
493 }
494 }
495
496 #[test]
497 fn test_fuzz_persistence_roundtrip(keys in proptest::collection::vec(".*", 1..50)) {
498 let filter = ExpandableCuckooFilter::new("persist-fuzz", 20);
499 for key in &keys {
500 filter.insert(key);
501 }
502 let bytes = filter.export().unwrap();
503 let restored = ExpandableCuckooFilter::new("persist-fuzz", 20);
504 restored.import(&bytes).unwrap();
505 for key in &keys {
506 prop_assert!(restored.contains(key));
507 }
508 prop_assert_eq!(filter.len(), restored.len());
509 }
510
511 #[test]
512 fn test_fuzz_set_semantics(ops in proptest::collection::vec((0..2u8, ".*"), 1..100)) {
513 let filter = ExpandableCuckooFilter::new("set-fuzz", 50);
514 let mut shadow_set = HashSet::new();
515
516 for (op_code, key) in ops {
517 match op_code {
518 0 => { filter.insert(&key);
520 shadow_set.insert(key);
521 },
522 1 => { if shadow_set.contains(&key) {
527 filter.remove(&key);
528 shadow_set.remove(&key);
529 }
530 },
531 _ => unreachable!(),
532 }
533 }
534
535 for key in &shadow_set {
536 prop_assert!(filter.contains(key), "Filter missing key: {}", key);
537 }
538 }
539 }
540 }
541}