1use std::collections::BTreeMap;
2
3use slot::Slot;
4use thiserror::Error;
5use anyhow::{Result, Ok};
6
7pub mod slot;
8pub mod extra;
9pub mod generic;
10
11enum MetadataType {
16 Tombstone,
17 BucketOccupied,
18 RunContinued,
19 IsShifted
20}
21
22#[derive(Error, Debug)]
23pub(crate) enum QuotientFilterError {
24 #[error("Invalid quotient access: `{0}`")]
25 InvalidQuotientAccess(usize),
26 #[error("Quotient cannot be more than 62 due to 64 bit hashing")]
27 InvalidQuotientSize,
28 #[error("Filters need to have the same size for merging")]
29 NotEqualSize,
30 #[error("Not able to find the quotient to insert")]
31 NotAbleToFindOccupied,
32 #[error("Converting error")]
33 ConvertingError
34}
35
36pub struct QuotientFilter {
37 count: usize,
38 remainder: u8,
39 size: usize,
40 table: Vec<Slot>
41}
42
43impl QuotientFilter {
44 pub fn new(quotient_size: u8) -> Result<Self> {
48 if quotient_size > 62 { return Err(anyhow::Error::new(QuotientFilterError::InvalidQuotientSize)); }
49 let size = usize::pow(2, quotient_size as u32);
50 let remainder = 64 - quotient_size;
51
52 Ok(Self {
53 count: 0,
54 remainder,
55 size,
56 table: vec![Slot::new(); size]
57 })
58 }
59
60 pub fn insert_value(&mut self, value: &[u8]) -> Result<usize> {
62 let fingerprint = const_fnv1a_hash::fnv1a_hash_64(value, None);
63 self.insert(fingerprint)
64 }
65
66 pub fn lookup_value(&mut self, value: &[u8]) -> bool {
68 let fingerprint = const_fnv1a_hash::fnv1a_hash_64(value, None);
69 self.lookup(fingerprint)
70 }
71
72 pub fn delete_value(&mut self, value: &[u8]) {
74 let fingerprint = const_fnv1a_hash::fnv1a_hash_64(value, None);
75 self.delete(fingerprint);
76 }
77
78 pub fn space(&self) -> u64 {
80 self.size as u64 * 72
81 }
82
83 pub fn resize(&mut self) -> anyhow::Result<()>{
86 let mut is_first = false;
88 let mut first_anchor = usize::default();
89 let mut index: usize = 0;
90 let mut fingerprints: Vec<u64> = Vec::with_capacity(self.count as usize);
91 while let Some(anchor_idx) = self.get_next_anchor(index) {
92 if anchor_idx == first_anchor { break; }
93 if !is_first { first_anchor = anchor_idx; is_first = true; }
94 let mut quotient_cache = anchor_idx;
95 let mut slot_idx = anchor_idx;
96 let mut fingerprint = self.table[anchor_idx].reconstruct_fingerprint(anchor_idx, self.remainder);
98
99 fingerprints.push(fingerprint);
100 slot_idx = self.index_up(slot_idx);
101 while !self.table[slot_idx].is_empty() {
102 while self.table[slot_idx].is_run_continued() {
103 fingerprint = self.table[slot_idx].reconstruct_fingerprint(quotient_cache, self.remainder);
104 fingerprints.push(fingerprint);
105 slot_idx = self.index_up(slot_idx);
106 }
107 if !self.table[slot_idx].is_empty() {
108 quotient_cache = self.get_next_occupied(quotient_cache).ok_or(anyhow::Error::new(QuotientFilterError::NotAbleToFindOccupied))?;
109 if self.table[slot_idx].is_run_start() {
110 fingerprint = self.table[slot_idx].reconstruct_fingerprint(quotient_cache, self.remainder);
111 fingerprints.push(fingerprint);
112 slot_idx = self.index_up(slot_idx);
113 }
114 } else {
115 break;
116 }
117 }
118 index = anchor_idx;
119 }
120
121 let mut old_table = std::mem::replace(&mut self.table, vec![Slot::new(); self.size * 2]);
122 self.size *= 2;
123 self.remainder -= 1;
124 self.count = 0;
125
126 for fingerprint in fingerprints {
127 if let Err(e) = self.insert(fingerprint) {
129 std::mem::swap(&mut self.table, &mut old_table);
130 self.size /= 2;
131 self.remainder += 1;
132 return Err(e);
133 }
134 }
135 Ok(())
136 }
137
138 pub fn merge(&mut self, other: &QuotientFilter) -> Result<()> {
140 if self.size != other.size { return Err(anyhow::Error::new(QuotientFilterError::NotEqualSize)); }
141
142 let mut map_1 = self.collect_fingerprint_map()?;
144 let mut map_2 = other.collect_fingerprint_map()?;
145 for (index, fingerprints) in &mut map_1 {
146 if let Some(value) = map_2.get_mut(index) {
147 fingerprints.append(value);
148 fingerprints.sort_unstable();
149 }
150 }
151 for (index, fingerprints) in map_2 {
152 if fingerprints.len() > 0 { map_1.insert(index, fingerprints); }
153 }
154
155 let mut old_table = std::mem::replace(&mut self.table, vec![Slot::new(); self.size * 2]);
157 self.size *= 2;
158 self.remainder -= 1;
159 self.count = 0;
160
161 for (_, fingerprints) in map_1 {
162 for fingerprint in fingerprints {
163 if let Err(e) = self.insert(fingerprint) {
164 std::mem::swap(&mut self.table, &mut old_table);
165 self.size /= 2;
166 self.remainder += 1;
167 return Err(e);
168 }
169 }
170 }
171 Ok(())
172 }
173
174 pub fn lookup(&mut self, fingerprint: u64) -> bool {
176 self.get_index(fingerprint).is_some()
177 }
178
179 pub fn delete(&mut self, fingerprint: u64) {
180 let (quotient, remainder) = self.fingerprint_destruction(fingerprint).unwrap_or_default();
181
182 if quotient == usize::default() && remainder == u64::default() { return;}
183
184 if let Some(bucket) = self.table.get(quotient) {
185 if !bucket.get_metadata(MetadataType::BucketOccupied) { return;}
186 } else { return; }
187
188 let mut b = self.get_start_of_the_cluster(quotient);
189 let mut s = b;
190 while b != quotient {
191 s = self.index_up(s);
192 s = self.get_lowest_of_run(s);
193 b = self.index_up(b);
194
195 b = self.skip_empty_slots(b);
196 }
197
198 let mut clear_bucket_occupied = true;
201 let mut clear_head = false;
202 let mut head_of_run_index: usize = 0;
203 while let Some(bucket) = self.table.get(s) {
204 if bucket.remainder != remainder {
205 if !clear_head { head_of_run_index = s; }
206 clear_head = true;
207 s = self.index_up(s);
208 clear_bucket_occupied = false;
209 if !self.table[s].get_metadata(MetadataType::RunContinued) { return; }
210 } else {
211 if self.table[self.index_up(s)].get_metadata(MetadataType::RunContinued) { clear_head = false; clear_bucket_occupied = false; }
212 break;
213 }
214 }
215
216 if clear_head { self.table[head_of_run_index].clear_metadata(MetadataType::BucketOccupied) }
217
218 self.table[s].set_metadata(MetadataType::Tombstone);
219 self.count -= 1;
220 if clear_bucket_occupied { self.table[s].clear_metadata(MetadataType::BucketOccupied); }
221 }
222
223 pub fn insert(&mut self, fingerprint: u64) -> Result<usize> {
225 if self.size - self.count as usize - 1 == 0 { self.resize()?; }
226 let (quotient, remainder) = self.fingerprint_destruction(fingerprint)?;
227 let is_quotient_occupied_before = self.table[quotient].is_occupied();
228 if let Some(bucket) = self.table.get_mut(quotient) {
230 bucket.set_metadata(MetadataType::BucketOccupied);
231 if bucket.is_empty() {
233 bucket.clear_metadata(MetadataType::Tombstone);
234 bucket.set_remainder(remainder);
235 self.count += 1;
236 return Ok(quotient);
237 }
238
239 let mut b = self.get_start_of_the_cluster(quotient);
241 let mut s = b;
242 let away_from_anchor = if quotient < s { quotient + 1 + self.size - 1 } else { quotient } - s;
244 while b != quotient {
247 s = self.index_up(s);
249 s = self.get_lowest_of_run(s);
250 b = self.index_up(b);
251 b = self.skip_empty_slots(b);
253 }
254
255 if !is_quotient_occupied_before {
256 loop {
257 if !self.table[s].is_empty() { s = self.index_up(s) } else { break; }
258 }
259 }
260
261 let is_part_of_existing_run = !self.table[s].is_empty();
264 while let Some(bucket) = self.table.get(s) {
265 if !bucket.is_empty() && remainder > bucket.remainder { s = self.index_up(s) }
266 else { break; }
267 }
268 let mut insert_index = s;
269 let mut extra_shift = false;
270 if !is_quotient_occupied_before {
274 let mut last_run = self.index_up(quotient);
275 while !self.table[last_run].is_run_continued() {
276 last_run = self.index_up(last_run);
277 }
278 let idx = if last_run > insert_index { insert_index + self.size } else { insert_index };
279 if idx - last_run != away_from_anchor {
280 extra_shift = true;
281 for _ in 0..away_from_anchor { last_run = self.index_up(last_run); }
282 insert_index = last_run;
283 }
284 }
285 let mut new_slot = Slot::new_with_remainder(remainder);
287 if quotient != insert_index { new_slot.set_metadata(MetadataType::IsShifted) };
288 if is_part_of_existing_run { new_slot.set_metadata(MetadataType::RunContinued); }
289 let mut tmp_bucket = Slot::default();
293 while let Some(bucket) = self.table.get_mut(s) {
294 if bucket.is_empty() { break; }
295 if tmp_bucket.get_metadata(MetadataType::BucketOccupied) { tmp_bucket.set_metadata(MetadataType::BucketOccupied); }
296 tmp_bucket = std::mem::replace(bucket, tmp_bucket);
297 tmp_bucket.set_metadata(MetadataType::IsShifted);
298
299 if is_part_of_existing_run {
301 if tmp_bucket.is_run_start() {
303 if tmp_bucket.is_occupied() {
304 new_slot.set_metadata(MetadataType::BucketOccupied);
305 tmp_bucket.clear_metadata(MetadataType::BucketOccupied);
306 }
307 new_slot.clear_metadata(MetadataType::RunContinued);
308 }
309 tmp_bucket.set_metadata(MetadataType::RunContinued);
310 }
311 s = self.index_up(s);
312 if self.table[s].is_empty() {
313 self.table[s] = tmp_bucket;
314 break;
315 }
316 }
317
318 if extra_shift {
319 let mut tmp_bucket = Slot::default();
320 let mut shift_index = insert_index;
321 while let Some(bucket) = self.table.get_mut(shift_index) {
322 if bucket.is_empty() { break; }
323 tmp_bucket = std::mem::replace(bucket, tmp_bucket);
324 tmp_bucket.set_metadata(MetadataType::IsShifted);
325 shift_index = self.index_up(shift_index);
326 if self.table[shift_index].is_empty() {
327 self.table[shift_index] = tmp_bucket;
328 break;
329 }
330 }
331 }
332
333 self.table[insert_index] = new_slot;
335 self.count += 1;
336
337 return Ok(insert_index)
338
339 }
340
341 Err(anyhow::Error::new(QuotientFilterError::InvalidQuotientAccess(quotient)))
342 }
343
344 pub fn get_index(&self, fingerprint: u64) -> Option<usize> {
345 let (quotient, remainder) = self.fingerprint_destruction(fingerprint).unwrap_or_default();
346 if quotient == usize::default() && remainder == u64::default() { return None; }
347
348 if let Some(bucket) = self.table.get(quotient) {
350 if !bucket.get_metadata(MetadataType::BucketOccupied) { return None; }
351 } else { return None; }
352
353 let mut b = self.get_start_of_the_cluster(quotient);
355
356 let mut s = b;
357
358 while b != quotient {
361 s = self.index_up(s);
363 s = self.get_lowest_of_run(s);
364 b = self.index_up(b);
365
366 b = self.skip_empty_slots(b);
368 }
369
370 while let Some(bucket) = self.table.get(s) {
372 if bucket.remainder != remainder {
373 s = self.index_up(s);
374 if !self.table[s].get_metadata(MetadataType::RunContinued) { return None; }
375 } else {
376 break;
377 }
378 }
379 Some(s)
380 }
381
382 fn fingerprint_destruction(&self, fingerprint: u64) -> Result<(usize, u64)> {
384 let quotient = fingerprint / u64::pow(2, self.remainder as u32);
385 let remainder = fingerprint % u64::pow(2, self.remainder as u32);
386 let quotient_usize = usize::try_from(quotient)?;
387 Ok((quotient_usize, remainder))
388 }
389
390 fn get_start_of_the_cluster(&self, start_index: usize) -> usize {
391 let mut index = start_index;
392 while let Some(slot) = self.table.get(index) {
393 if slot.is_shifted() { index = self.index_down(index); }
394 else { break; }
395 }
396 index
397 }
398
399 fn get_lowest_of_run(&self, start_index: usize) -> usize {
400 let mut index = start_index;
401 while let Some(slot) = self.table.get(index) {
402 if slot.get_metadata(MetadataType::RunContinued) { index = self.index_up(index) }
403 else { break; }
404 }
405 index
406 }
407
408 fn skip_empty_slots(&self, start_index: usize) -> usize {
409 let mut index = start_index;
410 while let Some(bucket) = self.table.get(index) {
411 if !bucket.get_metadata(MetadataType::BucketOccupied) { index = self.index_up(index) }
412 else { break; }
413 }
414 index
415 }
416
417 fn get_next_anchor(&self, index: usize) -> Option<usize> {
418 for i in index..self.size {
419 if self.table[i].is_cluster_start() { return Some(i); }
420 }
421 None
422 }
423
424 fn get_next_occupied(&self, cache: usize) -> Option<usize> {
425 let mut index = self.index_up(cache);
426 while let Some(slot) = self.table.get(index) {
427 if index == cache { return None; }
429 else if slot.is_occupied() { return Some(index); }
431 else { index = self.index_up(cache); }
432 }
433 None
434 }
435
436 fn collect_fingerprint_map(&self) -> Result<BTreeMap<usize, Vec<u64>>> {
438 let mut map: BTreeMap<usize, Vec<u64>> = BTreeMap::new();
439 let mut is_first = false;
440 let mut first_anchor = usize::default();
441 let mut index: usize = 0;
442
443 let mut insertion = |index: usize, fingerprint: u64| {
444 if let Some(value) = map.get_mut(&index) { value.push(fingerprint); } else { map.insert(index, vec![fingerprint]); }
445 };
446
447 while let Some(anchor_idx) = self.get_next_anchor(index) {
448 if anchor_idx == first_anchor { break; }
449 if !is_first { first_anchor = anchor_idx; is_first = true; }
450 let mut quotient_cache = anchor_idx;
451 let mut slot_idx = anchor_idx;
452 let mut fingerprint = self.table[anchor_idx].reconstruct_fingerprint(anchor_idx, self.remainder);
454 insertion(quotient_cache, fingerprint);
455 slot_idx = self.index_up(slot_idx);
456 while !self.table[slot_idx].is_empty() {
457 while self.table[slot_idx].is_run_continued() {
458 fingerprint = self.table[slot_idx].reconstruct_fingerprint(quotient_cache, self.remainder);
459 insertion(quotient_cache, fingerprint);
460 slot_idx = self.index_up(slot_idx);
461 }
462 if !self.table[slot_idx].is_empty() {
463 quotient_cache = self.get_next_occupied(quotient_cache).ok_or(anyhow::Error::new(QuotientFilterError::NotAbleToFindOccupied))?;
464 if self.table[slot_idx].is_run_start() {
465 fingerprint = self.table[slot_idx].reconstruct_fingerprint(quotient_cache, self.remainder);
466 insertion(quotient_cache, fingerprint);
467 slot_idx = self.index_up(slot_idx);
468 }
469 } else {
470 break;
471 }
472 }
473 index = anchor_idx;
474 }
475 for value in map.iter_mut() {
476 value.1.sort_unstable();
477 }
478 Ok(map)
479 }
480
481 #[inline(always)]
482 fn index_up(&self, old_index: usize) -> usize {
483 (old_index + 1) % (self.size)
484 }
485
486 #[inline(always)]
487 fn index_down(&self, old_index: usize) -> usize {
488 if old_index == 0 { return self.size - 1; }
489 old_index - 1
490 }
491
492}
493
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn insert_one() {
501 let mut filter = QuotientFilter::new(2).unwrap();
502 let idx = filter.insert_value(&1_u8.to_be_bytes()).unwrap(); assert_eq!(idx, 2);
504 assert!(filter.table[2].is_run_start());
505 assert!(filter.table[2].is_cluster_start());
506 }
507
508 #[test]
509 fn insert_two_same_quotient() {
510 let mut filter = QuotientFilter::new(2).unwrap();
511 let idx1 = filter.insert_value(&1_u8.to_be_bytes()).unwrap(); let idx2 = filter.insert_value(&2_u8.to_be_bytes()).unwrap(); assert_eq!(idx1, 2);
514 assert_eq!(idx2, 3);
515 assert!(!filter.table[3].is_occupied());
516 assert!(filter.table[3].is_run_continued());
517 assert!(filter.table[3].is_shifted());
518 }
519
520 #[test]
521 fn insert_second_run_on_different_quotient() {
522 let mut filter = QuotientFilter::new(2).unwrap();
523 let idx1 = filter.insert_value(&1_u8.to_be_bytes()).unwrap(); let idx2 = filter.insert_value(&2_u8.to_be_bytes()).unwrap(); let idx3 = filter.insert_value(&567889965_u64.to_be_bytes()).unwrap(); assert_eq!(idx1, 2);
527 assert_eq!(idx2, 3);
528 assert_eq!(idx3, 0);
529 assert!(!filter.table[0].is_occupied());
530 assert!(!filter.table[0].is_run_continued());
531 assert!(filter.table[0].is_shifted());
532 }
533
534 #[test]
536 fn insert_multiple_runs_different_quotients_sequentially() {
537 let mut filter = QuotientFilter::new(3).unwrap();
538 let idx1 = filter.insert_value(&1_u8.to_be_bytes()).unwrap(); let idx2 = filter.insert_value(&2_u8.to_be_bytes()).unwrap(); let idx3 = filter.insert_value(&3_u8.to_be_bytes()).unwrap(); let idx4 = filter.insert_value(&75324433_u32.to_be_bytes()).unwrap(); let idx5 = filter.insert_value(&75324434_u32.to_be_bytes()).unwrap(); let idx6 = filter.insert_value(&567889965_u64.to_be_bytes()).unwrap(); assert_eq!(idx1, 5);
546 assert_eq!(idx2, 6);
547 assert_eq!(idx3, 6); assert_eq!(idx4, 0);
549 assert_eq!(idx5, 0); assert_eq!(idx6, 0); assert!(!filter.table[0].is_occupied());
553 assert!(!filter.table[0].is_run_continued());
554 assert!(filter.table[0].is_shifted()); assert!(!filter.table[1].is_occupied());
557 assert!(!filter.table[1].is_run_continued());
558 assert!(filter.table[1].is_shifted()); assert!(!filter.table[2].is_occupied());
561 assert!(filter.table[2].is_run_continued());
562 assert!(filter.table[2].is_shifted()); assert!(filter.table[5].is_occupied());
565 assert!(!filter.table[5].is_run_continued());
566 assert!(!filter.table[5].is_shifted()); assert!(filter.table[6].is_occupied());
569 assert!(filter.table[6].is_run_continued());
570 assert!(filter.table[6].is_shifted()); assert!(filter.table[7].is_occupied());
573 assert!(filter.table[7].is_run_continued());
574 assert!(filter.table[7].is_shifted()); }
576
577 #[test]
578 fn insert_and_read_one_success() {
579 let mut filter = QuotientFilter::new(5).unwrap();
580 _ = filter.insert_value(&1_u8.to_be_bytes());
581 let result = filter.lookup_value(&1_u8.to_be_bytes());
582
583 assert!(result);
584 }
585
586 #[test]
587 fn insert_and_read_multiple_success() {
588 let mut filter = QuotientFilter::new(5).unwrap();
589 _ = filter.insert_value(&1_u8.to_be_bytes());
590 _ = filter.insert_value(&2_u8.to_be_bytes());
591 _ = filter.insert_value(&3_u8.to_be_bytes());
592 let result = filter.lookup_value(&2_u8.to_be_bytes());
593
594 assert!(result);
595 }
596
597 #[test]
598 fn insert_and_read_one_failure() {
599 let mut filter = QuotientFilter::new(5).unwrap();
600 _ = filter.insert_value(&1_u8.to_be_bytes());
601 let result = filter.lookup_value(&2_u8.to_be_bytes());
602
603 assert!(!result);
604 }
605
606 #[test]
607 fn insert_and_read_multiple_failure() {
608 let mut filter = QuotientFilter::new(5).unwrap();
609 _ = filter.insert_value(&1_u8.to_be_bytes());
610 _ = filter.insert_value(&2_u8.to_be_bytes());
611 _ = filter.insert_value(&3_u8.to_be_bytes());
612 let result = filter.lookup_value(&4_u8.to_be_bytes());
613
614 assert!(!result);
615 }
616
617 #[test]
618 fn delete_read_one_success() {
619 let mut filter = QuotientFilter::new(5).unwrap();
620 _ = filter.insert_value(&1_u8.to_be_bytes());
621 filter.delete_value(&1_u8.to_be_bytes());
622 let result = filter.lookup_value(&1_u8.to_be_bytes());
623
624 assert!(!result);
625 }
626
627 #[test]
628 fn delete_read_multiple_success() {
629 let mut filter = QuotientFilter::new(5).unwrap();
630 _ = filter.insert_value(&1_u8.to_be_bytes());
631 _ = filter.insert_value(&2_u8.to_be_bytes());
632 _ = filter.insert_value(&3_u8.to_be_bytes());
633 _ = filter.insert_value(&4_u8.to_be_bytes());
634 filter.delete_value(&2_u8.to_be_bytes());
635 let result = filter.lookup_value(&2_u8.to_be_bytes());
636
637 assert!(!result);
638 }
639
640 #[test]
641 fn delete_read_multiple_value_multiple_success() {
642 let mut filter = QuotientFilter::new(10).unwrap();
643 _ = filter.insert_value(&1_u8.to_be_bytes());
644 _ = filter.insert_value(&2_u8.to_be_bytes());
645 _ = filter.insert_value(&3_u8.to_be_bytes());
646 _ = filter.insert_value(&4_u8.to_be_bytes());
647 _ = filter.insert_value(&5_u8.to_be_bytes());
648 _ = filter.insert_value(&6_u8.to_be_bytes());
649 filter.delete_value(&2_u8.to_be_bytes());
650 filter.delete_value(&3_u8.to_be_bytes());
651 filter.delete_value(&6_u8.to_be_bytes());
652 let result1 = filter.lookup_value(&2_u8.to_be_bytes());
653 let result2 = filter.lookup_value(&3_u8.to_be_bytes());
654 let result3 = filter.lookup_value(&6_u8.to_be_bytes());
655
656 assert!(!result1);
657 assert!(!result2);
658 assert!(!result3);
659 }
660
661 #[test]
662 fn resize_one_element() {
663 let mut filter = QuotientFilter::new(2).unwrap();
664 _ = filter.insert_value(&1_u8.to_be_bytes());
666 _ = filter.resize();
667 assert!(!filter.table[5].is_empty());
668 assert!(filter.table[5].is_cluster_start());
669 }
670
671 #[test]
672 fn resize_two_different_quotient_element() {
673 let mut filter = QuotientFilter::new(2).unwrap();
674 _ = filter.insert_value(&1_u8.to_be_bytes());
676 _ = filter.insert_value(&567889965_u64.to_be_bytes()).unwrap(); _ = filter.resize();
679 assert!(!filter.table[5].is_empty());
680 assert!(filter.table[5].is_cluster_start());
681 assert!(!filter.table[6].is_empty());
682 assert!(filter.table[6].is_cluster_start());
683 }
684
685 #[test]
686 fn read_after_resize_one_element() {
687 let mut filter = QuotientFilter::new(2).unwrap();
688 _ = filter.insert_value(&1_u8.to_be_bytes());
689 _ = filter.resize();
690 assert!(filter.lookup_value(&1_u8.to_be_bytes()));
691 }
692
693}