atomic_cuckoo_filter/lib.rs
1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use std::collections::HashSet;
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9use std::hint;
10use std::marker::PhantomData;
11use std::sync::atomic::{AtomicUsize, Ordering};
12
13use derive_builder::Builder;
14use rand::Rng;
15use serde::{Deserialize, Serialize};
16
17/// Maximum number of spin-loop iterations before parking a thread.
18/// This balances CPU usage vs. latency - spinning avoids kernel calls for
19/// short waits, but we park threads to avoid wasting CPU on long waits.
20const MAX_SPIN: usize = 100;
21
22/// Magic bytes used to identify serialized filters
23const SERIALIZATION_MAGIC: &[u8; 4] = b"ACF1";
24
25/// Current serialization format version
26const SERIALIZATION_VERSION: u8 = 1;
27
28/// Error type for Cuckoo Filter insert operation
29#[derive(Debug, thiserror::Error, PartialEq)]
30pub enum Error {
31 /// Returned when the filter is full and cannot accommodate more elements
32 #[error("Not enough space to store this item.")]
33 NotEnoughSpace,
34}
35
36/// Error type for deserializing a Cuckoo Filter from bytes
37#[derive(Debug, thiserror::Error, PartialEq)]
38pub enum DeserializeError {
39 /// The provided byte slice does not start with the expected magic prefix
40 #[error("Invalid serialized data header")]
41 InvalidHeader,
42 /// The serialized data does not contain the expected amount of bytes
43 #[error("Serialized data has an unexpected length")]
44 InvalidLength,
45 /// The serialized data encodes an unsupported version
46 #[error("Unsupported serialization version: {0}")]
47 UnsupportedVersion(u8),
48 /// The serialized data encodes an invalid or unsupported configuration
49 #[error("Invalid configuration: {0}")]
50 InvalidConfiguration(String),
51 /// A numeric value in the serialized data cannot be represented on this platform
52 #[error("Serialized value is out of range for this platform")]
53 ValueOutOfRange,
54}
55
56#[derive(Serialize, Deserialize)]
57struct SerializableFilter {
58 magic: [u8; 4],
59 version: u8,
60 fingerprint_size: u32,
61 bucket_size: u32,
62 max_evictions: u32,
63 capacity: u64,
64 counter: u64,
65 buckets: Vec<u64>,
66}
67
68/// Types of locks that can be acquired on the filter
69#[derive(PartialEq)]
70pub enum LockKind {
71 /// Optimistic version tracking - does not block other operations but captures
72 /// a version number to detect if data changed during the operation
73 Optimistic,
74 /// Exclusive among writers only - prevents other writers but allows concurrent readers
75 WriterExclusive,
76 /// Fully exclusive access - blocks all other operations (used only during evictions)
77 FullyExclusive,
78}
79
80/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
81///
82/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
83/// enables three distinct concurrency modes:
84///
85/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
86/// can proceed simultaneously. Used for optimistic reads that detect data races.
87///
88/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
89/// concurrent modifications but allows concurrent reads.
90///
91/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
92/// Only used during complex eviction chains to ensure consistency.
93///
94/// ## Version Encoding Scheme
95/// The atomic usize encodes both lock state and version information:
96/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
97/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
98///
99/// This allows optimistic readers to detect when their read might be stale by
100/// comparing version numbers before and after the operation.
101pub struct Lock<'a> {
102 /// Reference to the shared atomic value encoding lock state and version
103 atomic: &'a AtomicUsize,
104 /// Snapshot of the atomic value when this lock was acquired.
105 /// Used for optimistic concurrency control and version tracking.
106 /// The lower 2 bits indicate lock type, upper bits track version changes.
107 version: usize,
108 /// The type of lock held by this instance
109 kind: LockKind,
110 /// Counter for spin attempts before transitioning to thread parking.
111 /// Implements adaptive spinning to balance latency vs CPU usage.
112 retry: usize,
113}
114
115impl<'a> Lock<'a> {
116 /// Create a new lock of the specified kind
117 /// Blocks until the lock can be acquired
118 fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
119 let mut lock = Self {
120 atomic,
121 version: 0,
122 kind,
123 retry: 0,
124 };
125 match lock.kind {
126 LockKind::Optimistic => loop {
127 // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
128 lock.version = atomic.load(Ordering::Relaxed);
129 if Self::kind(lock.version) != LockKind::FullyExclusive {
130 return lock;
131 }
132 lock.spin_or_park()
133 },
134 _ => loop {
135 // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
136 lock.version = atomic.load(Ordering::Relaxed);
137 if Self::kind(lock.version) != LockKind::Optimistic {
138 lock.spin_or_park();
139 continue;
140 }
141 // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
142 let new_version = if lock.kind == LockKind::WriterExclusive {
143 lock.version + 1
144 } else {
145 lock.version + 2
146 };
147 if atomic
148 .compare_exchange_weak(
149 lock.version,
150 new_version,
151 Ordering::Release,
152 Ordering::Relaxed,
153 )
154 .is_ok()
155 {
156 return lock;
157 }
158 },
159 }
160 }
161
162 /// Upgrade a WriterExclusive lock to a FullyExclusive lock
163 /// This assumes the current thread holds the writer exclusive lock.
164 fn upgrade(&mut self) {
165 self.atomic.store(self.version + 2, Ordering::Release);
166 self.kind = LockKind::FullyExclusive;
167 }
168
169 /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
170 /// Used for optimistic concurrency control
171 fn is_outdated(&self) -> bool {
172 let version = self.atomic.load(Ordering::Acquire);
173 Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
174 }
175
176 /// Get the key for parking a thread
177 /// Different keys are used for optimistic and exclusive locks
178 fn park_key(&self) -> usize {
179 let key = self.atomic.as_ptr() as usize;
180 match self.kind {
181 LockKind::Optimistic => key,
182 _ => key + 1,
183 }
184 }
185
186 /// Spin or park the thread when waiting for a lock
187 fn spin_or_park(&mut self) {
188 if self.retry > MAX_SPIN {
189 // After MAX_SPIN attempts, park the thread
190 self.retry = 0;
191 unsafe {
192 parking_lot_core::park(
193 self.park_key(),
194 || self.atomic.load(Ordering::Acquire) == self.version,
195 || (),
196 |_, _| (),
197 parking_lot_core::DEFAULT_PARK_TOKEN,
198 None,
199 );
200 }
201 } else {
202 // Otherwise, spin
203 self.retry += 1;
204 hint::spin_loop();
205 }
206 }
207
208 /// Extract the lock kind from the lower 2 bits of a version value
209 fn kind(version: usize) -> LockKind {
210 match version & 0b11 {
211 0 => LockKind::Optimistic,
212 1 => LockKind::WriterExclusive,
213 2 => LockKind::FullyExclusive,
214 _ => panic!("Invalid Lock"),
215 }
216 }
217}
218
219impl Drop for Lock<'_> {
220 /// Release the lock when it goes out of scope
221 fn drop(&mut self) {
222 match self.kind {
223 LockKind::Optimistic => return, // No need to do anything for Optimistic locks
224 LockKind::WriterExclusive => {
225 // For WriterExclusive locks, release the lock without incrementing the version
226 self.atomic.store(self.version, Ordering::Release);
227 }
228 LockKind::FullyExclusive => {
229 // For FullyExclusive locks, increment the version to invalidate Optimistic locks
230 self.atomic.store(self.version + 4, Ordering::Release);
231 }
232 }
233
234 // Unpark waiting threads
235 let optimistic_key = self.atomic.as_ptr() as usize;
236 let exclusive_key = optimistic_key + 1;
237 unsafe {
238 // Unpark all waiting optimistic locks
239 parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
240 // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
241 parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
242 }
243 }
244}
245
246/// A highly concurrent lock-free probabilistic data structure for set membership testing.
247///
248/// ## What Makes It "Cuckoo"
249///
250/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
251/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
252/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
253/// relocated to their alternate position, creating eviction chains.
254///
255/// ## Algorithm Overview
256///
257/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
258/// storing full keys, providing excellent space efficiency.
259///
260/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
261/// This provides better space efficiency and flexibility when inserting and removing items.
262///
263/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
264/// bucket and moved to its alternate location, potentially triggering a chain of evictions.
265///
266/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
267/// of traditional locks, with optimistic concurrency control for read operations.
268/// The only exception is when inserting with evictions, where a FullyExclusive lock is used
269/// to ensure consistency.
270///
271/// ## Key Advantages Over Bloom Filters
272///
273/// - **Deletions supported**: Items can be removed without false negatives
274/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
275/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
276/// - **High concurrency**: Lock-free design enables excellent parallel performance
277///
278/// ## Concurrency Model
279///
280/// - **Reads**: Optimistic, can proceed concurrently with most operations
281/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
282/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
283/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
284///
285/// ## Time Complexity
286///
287/// - **Lookup**: O(1)
288/// - **Deletion**: O(1)
289/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
290#[derive(Debug, Builder)]
291#[builder(
292 pattern = "owned",
293 build_fn(private, name = "base_build", validate = "Self::validate")
294)]
295pub struct CuckooFilter<H = DefaultHasher>
296where
297 H: Hasher + Default,
298{
299 // Configuration parameters
300 /// Maximum number of elements the filter can store
301 #[builder(default = "1048576")]
302 capacity: usize,
303
304 /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
305 #[builder(default = "16")]
306 fingerprint_size: usize,
307
308 /// Number of fingerprints per bucket
309 #[builder(default = "4")]
310 bucket_size: usize,
311
312 /// Maximum number of evictions to try before giving up
313 #[builder(default = "500")]
314 max_evictions: usize,
315
316 // Internal values - automatically derived from the configuration
317 /// Number of fingerprints that can be stored in a single atomic value
318 #[builder(setter(skip))]
319 fingerprints_per_atomic: usize,
320
321 /// Number of buckets in the filter (power of 2)
322 #[builder(setter(skip))]
323 num_buckets: usize,
324
325 /// Bit mask for extracting fingerprints
326 #[builder(setter(skip))]
327 fingerprint_mask: usize,
328
329 /// Storage for buckets, implemented as a vector of atomic values
330 #[builder(setter(skip))]
331 buckets: Vec<AtomicUsize>,
332
333 /// Atomic value used for locking
334 #[builder(setter(skip))]
335 lock: AtomicUsize,
336
337 /// Counter for the number of elements in the filter
338 #[builder(setter(skip))]
339 counter: AtomicUsize,
340
341 /// Phantom data for the hasher type
342 #[builder(setter(skip))]
343 _hasher: PhantomData<H>,
344}
345
346impl<H: Hasher + Default> CuckooFilter<H> {
347 /// Insert an item into the filter
348 ///
349 /// This operation first attempts a direct insertion without acquiring a lock.
350 /// If that fails due to bucket collisions, it falls back to the eviction-based
351 /// insertion algorithm which may require a write lock.
352 ///
353 /// Concurrent operations are safely handled through atomic operations.
354 ///
355 /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
356 pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
357 let (index, fingerprint) = self.index_and_fingerprint(item);
358 self.try_insert(index, fingerprint).or_else(|error| {
359 let lock = self.lock(LockKind::WriterExclusive).ok_or(error)?;
360 self.insert_with_evictions(index, fingerprint, lock)
361 })
362 }
363
364 /// Check if an item is in the filter and insert it if is not present (atomically)
365 ///
366 /// This method combines lookup and insert into a single atomic operation,
367 /// ensuring thread safety and consistency even with concurrent operations.
368 ///
369 /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
370 /// or Error::NotEnoughSpace if the filter is full
371 pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
372 let (index, fingerprint) = self.index_and_fingerprint(item);
373 if self.lookup_fingerprint(index, fingerprint).is_some() {
374 return Ok(false);
375 }
376 let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
377 if self.lookup_fingerprint(index, fingerprint).is_some() {
378 return Ok(false);
379 }
380 self.try_insert(index, fingerprint)
381 .or_else(|error| {
382 if self.max_evictions == 0 {
383 return Err(error);
384 }
385 self.insert_with_evictions(index, fingerprint, lock)
386 })
387 .map(|_| true)
388 }
389
390 /// Counts the number of occurrences of an item in the filter.
391 ///
392 /// # Notes
393 /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
394 /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
395 /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
396 /// - This method may count false positives due to hash collisions.
397 pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
398 let (index, fingerprint) = self.index_and_fingerprint(item);
399 let alt_index = self.alt_index(index, fingerprint);
400 self.atomic_read(
401 || {
402 self.read_bucket(index, Ordering::Acquire)
403 .filter(|&f| f == fingerprint)
404 .count()
405 + self
406 .read_bucket(alt_index, Ordering::Acquire)
407 .filter(|&f| f == fingerprint)
408 .count()
409 },
410 None,
411 )
412 }
413
414 /// Attempts to remove an item from the filter.
415 ///
416 /// Returns `true` if the item was successfully removed, or `false` if it was not found.
417 ///
418 /// Note:
419 /// - An item should only be removed if it was previously added. Removing a non-existent
420 /// item may inadvertently remove a different item due to hash collisions inherent to
421 /// cuckoo filters.
422 pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
423 let (index, fingerprint) = self.index_and_fingerprint(item);
424 while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
425 let _lock = self.lock(LockKind::WriterExclusive);
426 if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
427 return true;
428 }
429 }
430 false
431 }
432
433 /// Check if an item is in the filter
434 ///
435 /// Returns `true` if the item is possibly in the filter (may have false positives),
436 /// `false` if it is definitely not in the filter
437 pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
438 let (index, fingerprint) = self.index_and_fingerprint(item);
439 self.atomic_read(
440 || self.lookup_fingerprint(index, fingerprint).is_some(),
441 Some(true),
442 )
443 }
444
445 /// Get the number of elements in the filter
446 pub fn len(&self) -> usize {
447 self.counter.load(Ordering::Acquire)
448 }
449
450 /// Check if the filter is empty
451 pub fn is_empty(&self) -> bool {
452 self.len() == 0
453 }
454
455 /// Get the capacity of the filter
456 pub fn capacity(&self) -> usize {
457 self.capacity
458 }
459
460 /// Serialize the filter into a byte vector.
461 ///
462 /// The serialized format is versioned and includes the filter configuration,
463 /// element count, and all bucket contents. A FullyExclusive lock is acquired
464 /// (when supported) to guarantee a consistent snapshot.
465 pub fn to_bytes(&self) -> Vec<u8> {
466 let serializable = self.to_serializable();
467
468 bincode::serialize(&serializable).expect("Serialization should not fail")
469 }
470
471 fn to_serializable(&self) -> SerializableFilter {
472 let _lock = self.lock(LockKind::FullyExclusive);
473 SerializableFilter {
474 magic: *SERIALIZATION_MAGIC,
475 version: SERIALIZATION_VERSION,
476 fingerprint_size: self.fingerprint_size as u32,
477 bucket_size: self.bucket_size as u32,
478 max_evictions: self.max_evictions as u32,
479 capacity: self.capacity as u64,
480 counter: self.counter.load(Ordering::Acquire) as u64,
481 buckets: self
482 .buckets
483 .iter()
484 .map(|atomic| atomic.load(Ordering::Acquire) as u64)
485 .collect(),
486 }
487 }
488
489 fn from_serializable(serialized: SerializableFilter) -> Result<Self, DeserializeError> {
490 if serialized.magic != *SERIALIZATION_MAGIC {
491 return Err(DeserializeError::InvalidHeader);
492 }
493
494 if serialized.version != SERIALIZATION_VERSION {
495 return Err(DeserializeError::UnsupportedVersion(serialized.version));
496 }
497
498 let fingerprint_size: usize = serialized
499 .fingerprint_size
500 .try_into()
501 .map_err(|_| DeserializeError::ValueOutOfRange)?;
502 let bucket_size: usize = serialized
503 .bucket_size
504 .try_into()
505 .map_err(|_| DeserializeError::ValueOutOfRange)?;
506 let max_evictions: usize = serialized
507 .max_evictions
508 .try_into()
509 .map_err(|_| DeserializeError::ValueOutOfRange)?;
510 let capacity: usize = serialized
511 .capacity
512 .try_into()
513 .map_err(|_| DeserializeError::ValueOutOfRange)?;
514 let counter: usize = serialized
515 .counter
516 .try_into()
517 .map_err(|_| DeserializeError::ValueOutOfRange)?;
518 let atomic_len = serialized.buckets.len();
519
520 if counter > capacity {
521 return Err(DeserializeError::InvalidConfiguration(
522 "Serialized element count exceeds capacity".into(),
523 ));
524 }
525
526 let filter = CuckooFilterBuilder::<H>::default()
527 .capacity(capacity)
528 .fingerprint_size(fingerprint_size)
529 .bucket_size(bucket_size)
530 .max_evictions(max_evictions)
531 .build()
532 .map_err(|err| DeserializeError::InvalidConfiguration(err.to_string()))?;
533
534 if filter.buckets.len() != atomic_len {
535 return Err(DeserializeError::InvalidLength);
536 }
537
538 filter.counter.store(counter, Ordering::Release);
539
540 for (atomic, value) in filter.buckets.iter().zip(serialized.buckets.into_iter()) {
541 let value: usize = value
542 .try_into()
543 .map_err(|_| DeserializeError::ValueOutOfRange)?;
544 atomic.store(value, Ordering::Release);
545 }
546
547 Ok(filter)
548 }
549
550 /// Deserialize a filter from the provided byte slice.
551 pub fn from_bytes(bytes: &[u8]) -> Result<Self, DeserializeError> {
552 let serialized: SerializableFilter =
553 bincode::deserialize(bytes).map_err(|_| DeserializeError::InvalidLength)?;
554
555 Self::from_serializable(serialized)
556 }
557
558 /// Clear the filter, removing all elements
559 pub fn clear(&self) {
560 let _lock = self.lock(LockKind::WriterExclusive);
561 for atomic in &self.buckets {
562 let old_value = atomic.swap(0, Ordering::Release);
563 let removed = (0..self.fingerprints_per_atomic)
564 .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
565 .count();
566 if removed > 0 {
567 self.counter.fetch_sub(removed, Ordering::Release);
568 }
569 }
570 }
571
572 /// Compute the hash of an item
573 /// Uses the generic hasher H for flexibility and performance
574 fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
575 let mut hasher = <H as Default>::default();
576 data.hash(&mut hasher);
577 hasher.finish()
578 }
579
580 /// Compute the bucket index and fingerprint for an item.
581 ///
582 /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
583 /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
584 /// distribution across the fingerprint space, then add 1 to avoid zero.
585 /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
586 /// is always a power of 2, providing perfect hash distribution.
587 ///
588 /// ## Why This Design
589 ///
590 /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
591 /// so 0 can represent empty slots without ambiguity
592 /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
593 /// different bits via multiplication, avoiding correlation
594 /// - **Uniform distribution**: Both index and fingerprint are uniformly
595 /// distributed across their respective ranges
596 ///
597 /// Returns (index, fingerprint) where:
598 /// - index is the primary bucket index (0 to num_buckets-1)
599 /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
600 fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
601 let hash = self.hash(item);
602 // Compute fingerprint using multiplication and shift for better distribution
603 let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
604 // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
605 let index = hash as usize & (self.num_buckets - 1);
606 (index, fingerprint as usize)
607 }
608
609 /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
610 ///
611 /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
612 /// deterministically computes the alternate bucket index from the current index and fingerprint.
613 ///
614 /// Properties:
615 /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
616 /// 2. Distinctness: For any fingerprint, the two indices are always different.
617 /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
618 fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
619 index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
620 }
621
622 /// Look up a fingerprint at its primary or alternative index
623 /// Returns `Some((index, sub_index))` if found, None otherwise
624 fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
625 // First check the primary bucket
626 self.read_bucket(index, Ordering::Acquire)
627 .position(|fp| fp == fingerprint)
628 .map(|sub_index| (index, sub_index))
629 .or_else(|| {
630 // Then check the alternative bucket
631 let alt_index = self.alt_index(index, fingerprint);
632 self.read_bucket(alt_index, Ordering::Acquire)
633 .position(|fp| fp == fingerprint)
634 .map(|sub_index| (alt_index, sub_index))
635 })
636 }
637
638 /// Try to insert a fingerprint at its primary or alternative index
639 /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
640 fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
641 self.insert_at_index(index, fingerprint).or_else(|_| {
642 let alt_index = self.alt_index(index, fingerprint);
643 self.insert_at_index(alt_index, fingerprint)
644 })
645 }
646
647 /// Try to insert a fingerprint at a specific index
648 /// Returns Ok(()) if successful, Err(Error::NotEnoughSpace) if the bucket is full
649 fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
650 loop {
651 let sub_index = self
652 .read_bucket(index, Ordering::Relaxed)
653 .position(|i| i == 0)
654 .ok_or(Error::NotEnoughSpace)?;
655
656 if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
657 return Ok(());
658 }
659 }
660 }
661
662 /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
663 ///
664 /// This method is invoked only as a fallback when direct insertion fails, preserving
665 /// the optimistic, lock-free fast path for the common case.
666 ///
667 /// # Cuckoo Eviction Algorithm
668 ///
669 /// When both possible locations for an item are full:
670 /// 1. **Randomly select** an existing item from one of the full buckets
671 /// 2. **Evict** that item and insert our new item in its place
672 /// 3. **Relocate** the evicted item to its alternate location
673 /// 4. **Repeat** if the alternate location is also full (eviction chain)
674 /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
675 ///
676 /// # Implementation Details
677 ///
678 /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
679 /// atomically applied only if the chain succeeds, ensuring atomicity and consistency.
680 /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
681 /// `FullyExclusive` only when actually applying the eviction chain, maximizing
682 /// read concurrency during planning.
683 /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
684 /// in each bucket, to ensure early detection of loops in eviction chains.
685 fn insert_with_evictions(
686 &self,
687 mut index: usize,
688 mut fingerprint: usize,
689 mut lock: Lock,
690 ) -> Result<(), Error> {
691 let mut rng = rand::rng();
692 let mut insertions = Vec::with_capacity(self.max_evictions.min(32));
693 let mut used_slots = HashSet::with_capacity(self.max_evictions.min(32));
694 while insertions.len() <= self.max_evictions {
695 // Choose a sub-index in this bucket whose global slot has not been used yet in the plan
696 let base_slot = index * self.bucket_size;
697 let mut sub_index = rng.random_range(0..self.bucket_size);
698 if used_slots.contains(&(base_slot + sub_index)) {
699 sub_index = (0..self.bucket_size)
700 .find(|&i| !used_slots.contains(&(base_slot + i)))
701 .ok_or(Error::NotEnoughSpace)?;
702 }
703 used_slots.insert(base_slot + sub_index);
704 insertions.push((index, sub_index, fingerprint));
705
706 // Evict the fingerprint at the chosen sub-index
707 fingerprint = self
708 .read_bucket(index, Ordering::Relaxed)
709 .nth(sub_index)
710 .unwrap();
711 // Find the alternative index for the evicted fingerprint
712 index = self.alt_index(index, fingerprint);
713
714 if self.insert_at_index(index, fingerprint).is_ok() {
715 // Successfully inserted the fingerprint, now apply all evictions
716 lock.upgrade();
717 let mut evicted = fingerprint;
718 while let Some((index, sub_index, fingerprint)) = insertions.pop() {
719 self.update_bucket(index, sub_index, evicted, fingerprint, Ordering::Relaxed);
720 evicted = fingerprint;
721 }
722 return Ok(());
723 }
724 }
725 // Reached the maximum number of evictions, give up
726 Err(Error::NotEnoughSpace)
727 }
728
729 /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
730 ///
731 /// ## Memory Layout Complexity
732 ///
733 /// Fingerprints are tightly packed in memory across multiple atomic usize values:
734 /// - Each bucket contains `bucket_size` fingerprints
735 /// - Each fingerprint is `fingerprint_size` bits
736 /// - Multiple fingerprints are packed into each atomic usize
737 /// - Buckets may span across multiple atomic values
738 ///
739 /// ## Algorithm Steps
740 ///
741 /// 1. Calculate which atomic values contain this bucket's data
742 /// 2. Atomically load each relevant atomic value (using Acquire ordering)
743 /// 3. Extract fingerprints using bit manipulation and masking
744 /// 4. Handle boundary cases where buckets span multiple atomics
745 /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
746 ///
747 /// This is completely lock-free - multiple threads can read concurrently,
748 /// and reads can proceed even during writes (though they might see
749 /// intermediate states that get resolved by retry logic).
750 ///
751 /// Returns an Iterator over the fingerprints in the bucket, (0 = empty slot).
752 fn read_bucket(&self, index: usize, ordering: Ordering) -> impl Iterator<Item = usize> {
753 let fingerprint_index = index * self.bucket_size;
754 let bit_index = fingerprint_index * self.fingerprint_size;
755 let start_index = bit_index / usize::BITS as usize;
756 let skip_bits = bit_index % usize::BITS as usize;
757 let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
758 // No need to calculate end_index; just iterate from start_index to the end of the bucket
759 self.buckets[start_index..]
760 .iter()
761 .flat_map(move |atomic| {
762 let atomic_value = atomic.load(ordering);
763 (0..self.fingerprints_per_atomic).map(move |i| {
764 (atomic_value
765 >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
766 & self.fingerprint_mask
767 })
768 })
769 .skip(skip_fingerprints)
770 .take(self.bucket_size)
771 }
772
773 /// Atomically update a single fingerprint using lock-free compare-exchange.
774 ///
775 /// ## Lock-Free Update Algorithm
776 ///
777 /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
778 /// and the exact bit position within that atomic value
779 /// 2. **Read current state**: Load the current atomic value
780 /// 3. **Verify expectation**: Check that the target position contains `old_value`
781 /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
782 /// with `new_value`, but only if the atomic hasn't changed since step 2
783 /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
784 /// restart from step 2
785 ///
786 /// ## Concurrency Safety
787 ///
788 /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
789 /// but is more efficient than the strong version
790 /// - Employs Release ordering on success to ensure other threads see the change
791 /// - Updates the global counter atomically to maintain consistency
792 /// - Returns false if the expected `old_value` is no longer present (indicating
793 /// another thread already modified this slot)
794 ///
795 /// Returns `true` if update succeeded, `false` if the slot no longer contains
796 /// the expected `old_value` due to concurrent modification.
797 fn update_bucket(
798 &self,
799 index: usize,
800 sub_index: usize,
801 old_value: usize,
802 new_value: usize,
803 ordering: Ordering,
804 ) -> bool {
805 let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
806 let atomic_index = bit_index / usize::BITS as usize;
807 let skip_bits = bit_index % usize::BITS as usize;
808 let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
809 let fingerprint_mask = self.fingerprint_mask << shift;
810 let atomic = &self.buckets[atomic_index];
811
812 loop {
813 let atomic_value = atomic.load(Ordering::Relaxed);
814 if (atomic_value & fingerprint_mask) >> shift != old_value {
815 // The expected fingerprint is not present in the atomic value
816 return false;
817 }
818 let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
819 if atomic
820 .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
821 .is_ok()
822 {
823 // Update the counter based on the change
824 match (old_value, new_value) {
825 (0, _) => self.counter.fetch_add(1, Ordering::Release),
826 (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
827 (_, _) => 0,
828 };
829 return true;
830 }
831 }
832 }
833
834 /// Acquires a lock on the filter, if necessary.
835 ///
836 /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
837 /// If `max_evictions` is set to 0, no lock is acquired.
838 ///
839 /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
840 pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
841 if self.max_evictions == 0 {
842 None
843 } else {
844 Some(Lock::new(&self.lock, kind))
845 }
846 }
847
848 /// Execute a read operation with optimistic concurrency control and automatic retry.
849 ///
850 /// This is the cornerstone of the lock-free design, implementing a sophisticated
851 /// optimistic concurrency protocol that allows reads to proceed concurrently with
852 /// most write operations.
853 ///
854 /// ## Optimistic Concurrency Protocol
855 ///
856 /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
857 /// 2. **Execute read**: Run the provided function without any blocking
858 /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
859 /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
860 ///
861 /// ## How It Works
862 ///
863 /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
864 /// coordinate through atomic compare-exchange operations that are linearizable
865 /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
866 /// perform complex multi-step updates that require consistency
867 /// - **Early return optimization**: For operations that can short-circuit (like
868 /// `contains()` returning true), we skip version validation as an optimization
869 ///
870 /// This pattern is essential for achieving lock-free performance while maintaining
871 /// correctness in the presence of concurrent modifications.
872 fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
873 where
874 F: Fn() -> T,
875 T: PartialEq,
876 {
877 if self.max_evictions == 0 {
878 return fun();
879 }
880 loop {
881 let lock = Lock::new(&self.lock, LockKind::Optimistic);
882 let result = fun();
883 if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
884 return result;
885 }
886 }
887 }
888}
889
890impl<H: Hasher + Default> Serialize for CuckooFilter<H> {
891 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
892 where
893 S: serde::Serializer,
894 {
895 self.to_serializable().serialize(serializer)
896 }
897}
898
899impl<'de, H> Deserialize<'de> for CuckooFilter<H>
900where
901 H: Hasher + Default,
902{
903 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
904 where
905 D: serde::Deserializer<'de>,
906 {
907 let serialized = SerializableFilter::deserialize(deserializer)?;
908 Self::from_serializable(serialized).map_err(|err| serde::de::Error::custom(err.to_string()))
909 }
910}
911
912impl CuckooFilter<DefaultHasher> {
913 /// Create a new CuckooFilterBuilder with default settings
914 pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
915 CuckooFilterBuilder::default()
916 }
917
918 /// Create a new CuckooFilter with default settings
919 pub fn new() -> CuckooFilter<DefaultHasher> {
920 Self::builder().build().unwrap()
921 }
922
923 /// Create a new CuckooFilter with the specified capacity
924 pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
925 Self::builder().capacity(capacity).build().unwrap()
926 }
927}
928
929impl Default for CuckooFilter<DefaultHasher> {
930 /// Create a new CuckooFilter with default settings
931 fn default() -> Self {
932 Self::new()
933 }
934}
935
936impl<H: Hasher + Default> CuckooFilterBuilder<H> {
937 /// Validate the builder configuration
938 fn validate(&self) -> Result<(), String> {
939 if let Some(fingerprint_size) = self.fingerprint_size
940 && ![4, 8, 16, 32].contains(&fingerprint_size)
941 {
942 return Err("Invalid fingerprint_size".into());
943 }
944 if self.bucket_size == Some(0) {
945 return Err("bucket_size must be greater than zero".into());
946 }
947 if self.capacity == Some(0) {
948 return Err("capacity must be greater than zero".into());
949 }
950 Ok(())
951 }
952
953 /// Build a CuckooFilter with the specified configuration
954 pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
955 let mut cuckoo_filter = self.base_build()?;
956 // Calculate the number of buckets (power of 2)
957 cuckoo_filter.num_buckets = cuckoo_filter
958 .capacity
959 .div_ceil(cuckoo_filter.bucket_size)
960 .next_power_of_two();
961 // Adjust the capacity to match the actual number of buckets
962 cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
963 // Calculate the fingerprint mask
964 cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
965 // Calculate the number of fingerprints per atomic value
966 cuckoo_filter.fingerprints_per_atomic =
967 usize::BITS as usize / cuckoo_filter.fingerprint_size;
968 // Calculate the total number of atomic values needed
969 let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
970 let atomic_size = bit_size.div_ceil(usize::BITS as usize);
971 // Initialize the buckets
972 cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
973 Ok(cuckoo_filter)
974 }
975}