Skip to main content

atomic_cuckoo_filter/
lib.rs

1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use std::collections::HashSet;
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9use std::hint;
10use std::marker::PhantomData;
11use std::sync::atomic::{AtomicUsize, Ordering};
12
13use derive_builder::Builder;
14use rand::Rng;
15use serde::{Deserialize, Serialize};
16
17/// Maximum number of spin-loop iterations before parking a thread.
18/// This balances CPU usage vs. latency - spinning avoids kernel calls for
19/// short waits, but we park threads to avoid wasting CPU on long waits.
20const MAX_SPIN: usize = 100;
21
22/// Magic bytes used to identify serialized filters
23const SERIALIZATION_MAGIC: &[u8; 4] = b"ACF1";
24
25/// Current serialization format version
26const SERIALIZATION_VERSION: u8 = 1;
27
28/// Error type for Cuckoo Filter insert operation
29#[derive(Debug, thiserror::Error, PartialEq)]
30pub enum Error {
31    /// Returned when the filter is full and cannot accommodate more elements
32    #[error("Not enough space to store this item.")]
33    NotEnoughSpace,
34}
35
36/// Error type for deserializing a Cuckoo Filter from bytes
37#[derive(Debug, thiserror::Error, PartialEq)]
38pub enum DeserializeError {
39    /// The provided byte slice does not start with the expected magic prefix
40    #[error("Invalid serialized data header")]
41    InvalidHeader,
42    /// The serialized data does not contain the expected amount of bytes
43    #[error("Serialized data has an unexpected length")]
44    InvalidLength,
45    /// The serialized data encodes an unsupported version
46    #[error("Unsupported serialization version: {0}")]
47    UnsupportedVersion(u8),
48    /// The serialized data encodes an invalid or unsupported configuration
49    #[error("Invalid configuration: {0}")]
50    InvalidConfiguration(String),
51    /// A numeric value in the serialized data cannot be represented on this platform
52    #[error("Serialized value is out of range for this platform")]
53    ValueOutOfRange,
54}
55
56#[derive(Serialize, Deserialize)]
57struct SerializableFilter {
58    magic: [u8; 4],
59    version: u8,
60    fingerprint_size: u32,
61    bucket_size: u32,
62    max_evictions: u32,
63    capacity: u64,
64    counter: u64,
65    buckets: Vec<u64>,
66}
67
68/// Types of locks that can be acquired on the filter
69#[derive(PartialEq)]
70pub enum LockKind {
71    /// Optimistic version tracking - does not block other operations but captures
72    /// a version number to detect if data changed during the operation
73    Optimistic,
74    /// Exclusive among writers only - prevents other writers but allows concurrent readers
75    WriterExclusive,
76    /// Fully exclusive access - blocks all other operations (used only during evictions)
77    FullyExclusive,
78}
79
80/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
81///
82/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
83/// enables three distinct concurrency modes:
84///
85/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
86///    can proceed simultaneously. Used for optimistic reads that detect data races.
87///
88/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
89///    concurrent modifications but allows concurrent reads.
90///
91/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
92///    Only used during complex eviction chains to ensure consistency.
93///
94/// ## Version Encoding Scheme
95/// The atomic usize encodes both lock state and version information:
96/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
97/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
98///
99/// This allows optimistic readers to detect when their read might be stale by
100/// comparing version numbers before and after the operation.
101pub struct Lock<'a> {
102    /// Reference to the shared atomic value encoding lock state and version
103    atomic: &'a AtomicUsize,
104    /// Snapshot of the atomic value when this lock was acquired.
105    /// Used for optimistic concurrency control and version tracking.
106    /// The lower 2 bits indicate lock type, upper bits track version changes.
107    version: usize,
108    /// The type of lock held by this instance
109    kind: LockKind,
110    /// Counter for spin attempts before transitioning to thread parking.
111    /// Implements adaptive spinning to balance latency vs CPU usage.
112    retry: usize,
113}
114
115impl<'a> Lock<'a> {
116    /// Create a new lock of the specified kind
117    /// Blocks until the lock can be acquired
118    fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
119        let mut lock = Self {
120            atomic,
121            version: 0,
122            kind,
123            retry: 0,
124        };
125        match lock.kind {
126            LockKind::Optimistic => loop {
127                // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
128                lock.version = atomic.load(Ordering::Relaxed);
129                if Self::kind(lock.version) != LockKind::FullyExclusive {
130                    return lock;
131                }
132                lock.spin_or_park()
133            },
134            _ => loop {
135                // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
136                lock.version = atomic.load(Ordering::Relaxed);
137                if Self::kind(lock.version) != LockKind::Optimistic {
138                    lock.spin_or_park();
139                    continue;
140                }
141                // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
142                let new_version = if lock.kind == LockKind::WriterExclusive {
143                    lock.version + 1
144                } else {
145                    lock.version + 2
146                };
147                if atomic
148                    .compare_exchange_weak(
149                        lock.version,
150                        new_version,
151                        Ordering::Release,
152                        Ordering::Relaxed,
153                    )
154                    .is_ok()
155                {
156                    return lock;
157                }
158            },
159        }
160    }
161
162    /// Upgrade a WriterExclusive lock to a FullyExclusive lock
163    /// This assumes the current thread holds the writer exclusive lock.
164    fn upgrade(&mut self) {
165        self.atomic.store(self.version + 2, Ordering::Release);
166        self.kind = LockKind::FullyExclusive;
167    }
168
169    /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
170    /// Used for optimistic concurrency control
171    fn is_outdated(&self) -> bool {
172        let version = self.atomic.load(Ordering::Acquire);
173        Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
174    }
175
176    /// Get the key for parking a thread
177    /// Different keys are used for optimistic and exclusive locks
178    fn park_key(&self) -> usize {
179        let key = self.atomic.as_ptr() as usize;
180        match self.kind {
181            LockKind::Optimistic => key,
182            _ => key + 1,
183        }
184    }
185
186    /// Spin or park the thread when waiting for a lock
187    fn spin_or_park(&mut self) {
188        if self.retry > MAX_SPIN {
189            // After MAX_SPIN attempts, park the thread
190            self.retry = 0;
191            unsafe {
192                parking_lot_core::park(
193                    self.park_key(),
194                    || self.atomic.load(Ordering::Acquire) == self.version,
195                    || (),
196                    |_, _| (),
197                    parking_lot_core::DEFAULT_PARK_TOKEN,
198                    None,
199                );
200            }
201        } else {
202            // Otherwise, spin
203            self.retry += 1;
204            hint::spin_loop();
205        }
206    }
207
208    /// Extract the lock kind from the lower 2 bits of a version value
209    fn kind(version: usize) -> LockKind {
210        match version & 0b11 {
211            0 => LockKind::Optimistic,
212            1 => LockKind::WriterExclusive,
213            2 => LockKind::FullyExclusive,
214            _ => panic!("Invalid Lock"),
215        }
216    }
217}
218
219impl Drop for Lock<'_> {
220    /// Release the lock when it goes out of scope
221    fn drop(&mut self) {
222        match self.kind {
223            LockKind::Optimistic => return, // No need to do anything for Optimistic locks
224            LockKind::WriterExclusive => {
225                // For WriterExclusive locks, release the lock without incrementing the version
226                self.atomic.store(self.version, Ordering::Release);
227            }
228            LockKind::FullyExclusive => {
229                // For FullyExclusive locks, increment the version to invalidate Optimistic locks
230                self.atomic.store(self.version + 4, Ordering::Release);
231            }
232        }
233
234        // Unpark waiting threads
235        let optimistic_key = self.atomic.as_ptr() as usize;
236        let exclusive_key = optimistic_key + 1;
237        unsafe {
238            // Unpark all waiting optimistic locks
239            parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
240            // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
241            parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
242        }
243    }
244}
245
246/// A highly concurrent lock-free probabilistic data structure for set membership testing.
247///
248/// ## What Makes It "Cuckoo"
249///
250/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
251/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
252/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
253/// relocated to their alternate position, creating eviction chains.
254///
255/// ## Algorithm Overview
256///
257/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
258///    storing full keys, providing excellent space efficiency.
259///
260/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
261///    This provides better space efficiency and flexibility when inserting and removing items.
262///
263/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
264///    bucket and moved to its alternate location, potentially triggering a chain of evictions.
265///
266/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
267///    of traditional locks, with optimistic concurrency control for read operations.
268///    The only exception is when inserting with evictions, where a FullyExclusive lock is used
269///    to ensure consistency.
270///
271/// ## Key Advantages Over Bloom Filters
272///
273/// - **Deletions supported**: Items can be removed without false negatives
274/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
275/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
276/// - **High concurrency**: Lock-free design enables excellent parallel performance
277///
278/// ## Concurrency Model
279///
280/// - **Reads**: Optimistic, can proceed concurrently with most operations
281/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
282/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
283/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
284///
285/// ## Time Complexity
286///
287/// - **Lookup**: O(1)
288/// - **Deletion**: O(1)
289/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
290#[derive(Debug, Builder)]
291#[builder(
292    pattern = "owned",
293    build_fn(private, name = "base_build", validate = "Self::validate")
294)]
295pub struct CuckooFilter<H = DefaultHasher>
296where
297    H: Hasher + Default,
298{
299    // Configuration parameters
300    /// Maximum number of elements the filter can store
301    #[builder(default = "1048576")]
302    capacity: usize,
303
304    /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
305    #[builder(default = "16")]
306    fingerprint_size: usize,
307
308    /// Number of fingerprints per bucket
309    #[builder(default = "4")]
310    bucket_size: usize,
311
312    /// Maximum number of evictions to try before giving up
313    #[builder(default = "500")]
314    max_evictions: usize,
315
316    // Internal values - automatically derived from the configuration
317    /// Number of fingerprints that can be stored in a single atomic value
318    #[builder(setter(skip))]
319    fingerprints_per_atomic: usize,
320
321    /// Number of buckets in the filter (power of 2)
322    #[builder(setter(skip))]
323    num_buckets: usize,
324
325    /// Bit mask for extracting fingerprints
326    #[builder(setter(skip))]
327    fingerprint_mask: usize,
328
329    /// Storage for buckets, implemented as a vector of atomic values
330    #[builder(setter(skip))]
331    buckets: Vec<AtomicUsize>,
332
333    /// Atomic value used for locking
334    #[builder(setter(skip))]
335    lock: AtomicUsize,
336
337    /// Counter for the number of elements in the filter
338    #[builder(setter(skip))]
339    counter: AtomicUsize,
340
341    /// Phantom data for the hasher type
342    #[builder(setter(skip))]
343    _hasher: PhantomData<H>,
344}
345
346impl<H: Hasher + Default> CuckooFilter<H> {
347    /// Insert an item into the filter
348    ///
349    /// This operation first attempts a direct insertion without acquiring a lock.
350    /// If that fails due to bucket collisions, it falls back to the eviction-based
351    /// insertion algorithm which may require a write lock.
352    ///
353    /// Concurrent operations are safely handled through atomic operations.
354    ///
355    /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
356    pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
357        let (index, fingerprint) = self.index_and_fingerprint(item);
358        self.try_insert(index, fingerprint).or_else(|error| {
359            let lock = self.lock(LockKind::WriterExclusive).ok_or(error)?;
360            self.insert_with_evictions(index, fingerprint, lock)
361        })
362    }
363
364    /// Check if an item is in the filter and insert it if is not present (atomically)
365    ///
366    /// This method combines lookup and insert into a single atomic operation,
367    /// ensuring thread safety and consistency even with concurrent operations.
368    ///
369    /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
370    /// or Error::NotEnoughSpace if the filter is full
371    pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
372        let (index, fingerprint) = self.index_and_fingerprint(item);
373        if self.lookup_fingerprint(index, fingerprint).is_some() {
374            return Ok(false);
375        }
376        let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
377        if self.lookup_fingerprint(index, fingerprint).is_some() {
378            return Ok(false);
379        }
380        self.try_insert(index, fingerprint)
381            .or_else(|error| {
382                if self.max_evictions == 0 {
383                    return Err(error);
384                }
385                self.insert_with_evictions(index, fingerprint, lock)
386            })
387            .map(|_| true)
388    }
389
390    /// Counts the number of occurrences of an item in the filter.
391    ///
392    /// # Notes
393    /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
394    /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
395    /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
396    /// - This method may count false positives due to hash collisions.
397    pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
398        let (index, fingerprint) = self.index_and_fingerprint(item);
399        let alt_index = self.alt_index(index, fingerprint);
400        self.atomic_read(
401            || {
402                self.read_bucket(index, Ordering::Acquire)
403                    .filter(|&f| f == fingerprint)
404                    .count()
405                    + self
406                        .read_bucket(alt_index, Ordering::Acquire)
407                        .filter(|&f| f == fingerprint)
408                        .count()
409            },
410            None,
411        )
412    }
413
414    /// Attempts to remove an item from the filter.
415    ///
416    /// Returns `true` if the item was successfully removed, or `false` if it was not found.
417    ///
418    /// Note:
419    /// - An item should only be removed if it was previously added. Removing a non-existent
420    ///   item may inadvertently remove a different item due to hash collisions inherent to
421    ///   cuckoo filters.
422    pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
423        let (index, fingerprint) = self.index_and_fingerprint(item);
424        while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
425            let _lock = self.lock(LockKind::WriterExclusive);
426            if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
427                return true;
428            }
429        }
430        false
431    }
432
433    /// Check if an item is in the filter
434    ///
435    /// Returns `true` if the item is possibly in the filter (may have false positives),
436    /// `false` if it is definitely not in the filter
437    pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
438        let (index, fingerprint) = self.index_and_fingerprint(item);
439        self.atomic_read(
440            || self.lookup_fingerprint(index, fingerprint).is_some(),
441            Some(true),
442        )
443    }
444
445    /// Get the number of elements in the filter
446    pub fn len(&self) -> usize {
447        self.counter.load(Ordering::Acquire)
448    }
449
450    /// Check if the filter is empty
451    pub fn is_empty(&self) -> bool {
452        self.len() == 0
453    }
454
455    /// Get the capacity of the filter
456    pub fn capacity(&self) -> usize {
457        self.capacity
458    }
459
460    /// Serialize the filter into a byte vector.
461    ///
462    /// The serialized format is versioned and includes the filter configuration,
463    /// element count, and all bucket contents. A FullyExclusive lock is acquired
464    /// (when supported) to guarantee a consistent snapshot.
465    pub fn to_bytes(&self) -> Vec<u8> {
466        let serializable = self.to_serializable();
467
468        bincode::serialize(&serializable).expect("Serialization should not fail")
469    }
470
471    fn to_serializable(&self) -> SerializableFilter {
472        let _lock = self.lock(LockKind::FullyExclusive);
473        SerializableFilter {
474            magic: *SERIALIZATION_MAGIC,
475            version: SERIALIZATION_VERSION,
476            fingerprint_size: self.fingerprint_size as u32,
477            bucket_size: self.bucket_size as u32,
478            max_evictions: self.max_evictions as u32,
479            capacity: self.capacity as u64,
480            counter: self.counter.load(Ordering::Acquire) as u64,
481            buckets: self
482                .buckets
483                .iter()
484                .map(|atomic| atomic.load(Ordering::Acquire) as u64)
485                .collect(),
486        }
487    }
488
489    fn from_serializable(serialized: SerializableFilter) -> Result<Self, DeserializeError> {
490        if serialized.magic != *SERIALIZATION_MAGIC {
491            return Err(DeserializeError::InvalidHeader);
492        }
493
494        if serialized.version != SERIALIZATION_VERSION {
495            return Err(DeserializeError::UnsupportedVersion(serialized.version));
496        }
497
498        let fingerprint_size: usize = serialized
499            .fingerprint_size
500            .try_into()
501            .map_err(|_| DeserializeError::ValueOutOfRange)?;
502        let bucket_size: usize = serialized
503            .bucket_size
504            .try_into()
505            .map_err(|_| DeserializeError::ValueOutOfRange)?;
506        let max_evictions: usize = serialized
507            .max_evictions
508            .try_into()
509            .map_err(|_| DeserializeError::ValueOutOfRange)?;
510        let capacity: usize = serialized
511            .capacity
512            .try_into()
513            .map_err(|_| DeserializeError::ValueOutOfRange)?;
514        let counter: usize = serialized
515            .counter
516            .try_into()
517            .map_err(|_| DeserializeError::ValueOutOfRange)?;
518        let atomic_len = serialized.buckets.len();
519
520        if counter > capacity {
521            return Err(DeserializeError::InvalidConfiguration(
522                "Serialized element count exceeds capacity".into(),
523            ));
524        }
525
526        let filter = CuckooFilterBuilder::<H>::default()
527            .capacity(capacity)
528            .fingerprint_size(fingerprint_size)
529            .bucket_size(bucket_size)
530            .max_evictions(max_evictions)
531            .build()
532            .map_err(|err| DeserializeError::InvalidConfiguration(err.to_string()))?;
533
534        if filter.buckets.len() != atomic_len {
535            return Err(DeserializeError::InvalidLength);
536        }
537
538        filter.counter.store(counter, Ordering::Release);
539
540        for (atomic, value) in filter.buckets.iter().zip(serialized.buckets.into_iter()) {
541            let value: usize = value
542                .try_into()
543                .map_err(|_| DeserializeError::ValueOutOfRange)?;
544            atomic.store(value, Ordering::Release);
545        }
546
547        Ok(filter)
548    }
549
550    /// Deserialize a filter from the provided byte slice.
551    pub fn from_bytes(bytes: &[u8]) -> Result<Self, DeserializeError> {
552        let serialized: SerializableFilter =
553            bincode::deserialize(bytes).map_err(|_| DeserializeError::InvalidLength)?;
554
555        Self::from_serializable(serialized)
556    }
557
558    /// Clear the filter, removing all elements
559    pub fn clear(&self) {
560        let _lock = self.lock(LockKind::WriterExclusive);
561        for atomic in &self.buckets {
562            let old_value = atomic.swap(0, Ordering::Release);
563            let removed = (0..self.fingerprints_per_atomic)
564                .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
565                .count();
566            if removed > 0 {
567                self.counter.fetch_sub(removed, Ordering::Release);
568            }
569        }
570    }
571
572    /// Compute the hash of an item
573    /// Uses the generic hasher H for flexibility and performance
574    fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
575        let mut hasher = <H as Default>::default();
576        data.hash(&mut hasher);
577        hasher.finish()
578    }
579
580    /// Compute the bucket index and fingerprint for an item.
581    ///
582    /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
583    /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
584    ///    distribution across the fingerprint space, then add 1 to avoid zero.
585    /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
586    ///    is always a power of 2, providing perfect hash distribution.
587    ///
588    /// ## Why This Design
589    ///
590    /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
591    ///   so 0 can represent empty slots without ambiguity
592    /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
593    ///   different bits via multiplication, avoiding correlation
594    /// - **Uniform distribution**: Both index and fingerprint are uniformly
595    ///   distributed across their respective ranges
596    ///
597    /// Returns (index, fingerprint) where:
598    /// - index is the primary bucket index (0 to num_buckets-1)  
599    /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
600    fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
601        let hash = self.hash(item);
602        // Compute fingerprint using multiplication and shift for better distribution
603        let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
604        // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
605        let index = hash as usize & (self.num_buckets - 1);
606        (index, fingerprint as usize)
607    }
608
609    /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
610    ///
611    /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
612    /// deterministically computes the alternate bucket index from the current index and fingerprint.
613    ///
614    /// Properties:
615    /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
616    /// 2. Distinctness: For any fingerprint, the two indices are always different.
617    /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
618    fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
619        index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
620    }
621
622    /// Look up a fingerprint at its primary or alternative index
623    /// Returns `Some((index, sub_index))` if found, None otherwise
624    fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
625        // First check the primary bucket
626        self.read_bucket(index, Ordering::Acquire)
627            .position(|fp| fp == fingerprint)
628            .map(|sub_index| (index, sub_index))
629            .or_else(|| {
630                // Then check the alternative bucket
631                let alt_index = self.alt_index(index, fingerprint);
632                self.read_bucket(alt_index, Ordering::Acquire)
633                    .position(|fp| fp == fingerprint)
634                    .map(|sub_index| (alt_index, sub_index))
635            })
636    }
637
638    /// Try to insert a fingerprint at its primary or alternative index
639    /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
640    fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
641        self.insert_at_index(index, fingerprint).or_else(|_| {
642            let alt_index = self.alt_index(index, fingerprint);
643            self.insert_at_index(alt_index, fingerprint)
644        })
645    }
646
647    /// Try to insert a fingerprint at a specific index
648    /// Returns Ok(()) if successful, Err(Error::NotEnoughSpace) if the bucket is full
649    fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
650        loop {
651            let sub_index = self
652                .read_bucket(index, Ordering::Relaxed)
653                .position(|i| i == 0)
654                .ok_or(Error::NotEnoughSpace)?;
655
656            if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
657                return Ok(());
658            }
659        }
660    }
661
662    /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
663    ///
664    /// This method is invoked only as a fallback when direct insertion fails, preserving
665    /// the optimistic, lock-free fast path for the common case.
666    ///
667    /// # Cuckoo Eviction Algorithm
668    ///
669    /// When both possible locations for an item are full:
670    /// 1. **Randomly select** an existing item from one of the full buckets
671    /// 2. **Evict** that item and insert our new item in its place
672    /// 3. **Relocate** the evicted item to its alternate location
673    /// 4. **Repeat** if the alternate location is also full (eviction chain)
674    /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
675    ///
676    /// # Implementation Details
677    ///
678    /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
679    ///   atomically applied only if the chain succeeds, ensuring atomicity and consistency.
680    /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
681    ///   `FullyExclusive` only when actually applying the eviction chain, maximizing
682    ///   read concurrency during planning.
683    /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
684    ///   in each bucket, to ensure early detection of loops in eviction chains.
685    fn insert_with_evictions(
686        &self,
687        mut index: usize,
688        mut fingerprint: usize,
689        mut lock: Lock,
690    ) -> Result<(), Error> {
691        let mut rng = rand::rng();
692        let mut insertions = Vec::with_capacity(self.max_evictions.min(32));
693        let mut used_slots = HashSet::with_capacity(self.max_evictions.min(32));
694        while insertions.len() <= self.max_evictions {
695            // Choose a sub-index in this bucket whose global slot has not been used yet in the plan
696            let base_slot = index * self.bucket_size;
697            let mut sub_index = rng.random_range(0..self.bucket_size);
698            if used_slots.contains(&(base_slot + sub_index)) {
699                sub_index = (0..self.bucket_size)
700                    .find(|&i| !used_slots.contains(&(base_slot + i)))
701                    .ok_or(Error::NotEnoughSpace)?;
702            }
703            used_slots.insert(base_slot + sub_index);
704            insertions.push((index, sub_index, fingerprint));
705
706            // Evict the fingerprint at the chosen sub-index
707            fingerprint = self
708                .read_bucket(index, Ordering::Relaxed)
709                .nth(sub_index)
710                .unwrap();
711            // Find the alternative index for the evicted fingerprint
712            index = self.alt_index(index, fingerprint);
713
714            if self.insert_at_index(index, fingerprint).is_ok() {
715                // Successfully inserted the fingerprint, now apply all evictions
716                lock.upgrade();
717                let mut evicted = fingerprint;
718                while let Some((index, sub_index, fingerprint)) = insertions.pop() {
719                    self.update_bucket(index, sub_index, evicted, fingerprint, Ordering::Relaxed);
720                    evicted = fingerprint;
721                }
722                return Ok(());
723            }
724        }
725        // Reached the maximum number of evictions, give up
726        Err(Error::NotEnoughSpace)
727    }
728
729    /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
730    ///
731    /// ## Memory Layout Complexity
732    ///
733    /// Fingerprints are tightly packed in memory across multiple atomic usize values:
734    /// - Each bucket contains `bucket_size` fingerprints
735    /// - Each fingerprint is `fingerprint_size` bits
736    /// - Multiple fingerprints are packed into each atomic usize
737    /// - Buckets may span across multiple atomic values
738    ///
739    /// ## Algorithm Steps
740    ///
741    /// 1. Calculate which atomic values contain this bucket's data
742    /// 2. Atomically load each relevant atomic value (using Acquire ordering)
743    /// 3. Extract fingerprints using bit manipulation and masking
744    /// 4. Handle boundary cases where buckets span multiple atomics
745    /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
746    ///
747    /// This is completely lock-free - multiple threads can read concurrently,
748    /// and reads can proceed even during writes (though they might see
749    /// intermediate states that get resolved by retry logic).
750    ///
751    /// Returns an Iterator over the fingerprints in the bucket, (0 = empty slot).
752    fn read_bucket(&self, index: usize, ordering: Ordering) -> impl Iterator<Item = usize> {
753        let fingerprint_index = index * self.bucket_size;
754        let bit_index = fingerprint_index * self.fingerprint_size;
755        let start_index = bit_index / usize::BITS as usize;
756        let skip_bits = bit_index % usize::BITS as usize;
757        let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
758        // No need to calculate end_index; just iterate from start_index to the end of the bucket
759        self.buckets[start_index..]
760            .iter()
761            .flat_map(move |atomic| {
762                let atomic_value = atomic.load(ordering);
763                (0..self.fingerprints_per_atomic).map(move |i| {
764                    (atomic_value
765                        >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
766                        & self.fingerprint_mask
767                })
768            })
769            .skip(skip_fingerprints)
770            .take(self.bucket_size)
771    }
772
773    /// Atomically update a single fingerprint using lock-free compare-exchange.
774    ///
775    /// ## Lock-Free Update Algorithm
776    ///
777    /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
778    ///    and the exact bit position within that atomic value
779    /// 2. **Read current state**: Load the current atomic value
780    /// 3. **Verify expectation**: Check that the target position contains `old_value`
781    /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
782    ///    with `new_value`, but only if the atomic hasn't changed since step 2
783    /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
784    ///    restart from step 2
785    ///
786    /// ## Concurrency Safety
787    ///
788    /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
789    ///   but is more efficient than the strong version
790    /// - Employs Release ordering on success to ensure other threads see the change
791    /// - Updates the global counter atomically to maintain consistency
792    /// - Returns false if the expected `old_value` is no longer present (indicating
793    ///   another thread already modified this slot)
794    ///
795    /// Returns `true` if update succeeded, `false` if the slot no longer contains
796    /// the expected `old_value` due to concurrent modification.
797    fn update_bucket(
798        &self,
799        index: usize,
800        sub_index: usize,
801        old_value: usize,
802        new_value: usize,
803        ordering: Ordering,
804    ) -> bool {
805        let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
806        let atomic_index = bit_index / usize::BITS as usize;
807        let skip_bits = bit_index % usize::BITS as usize;
808        let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
809        let fingerprint_mask = self.fingerprint_mask << shift;
810        let atomic = &self.buckets[atomic_index];
811
812        loop {
813            let atomic_value = atomic.load(Ordering::Relaxed);
814            if (atomic_value & fingerprint_mask) >> shift != old_value {
815                // The expected fingerprint is not present in the atomic value
816                return false;
817            }
818            let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
819            if atomic
820                .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
821                .is_ok()
822            {
823                // Update the counter based on the change
824                match (old_value, new_value) {
825                    (0, _) => self.counter.fetch_add(1, Ordering::Release),
826                    (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
827                    (_, _) => 0,
828                };
829                return true;
830            }
831        }
832    }
833
834    /// Acquires a lock on the filter, if necessary.
835    ///
836    /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
837    /// If `max_evictions` is set to 0, no lock is acquired.
838    ///
839    /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
840    pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
841        if self.max_evictions == 0 {
842            None
843        } else {
844            Some(Lock::new(&self.lock, kind))
845        }
846    }
847
848    /// Execute a read operation with optimistic concurrency control and automatic retry.
849    ///
850    /// This is the cornerstone of the lock-free design, implementing a sophisticated
851    /// optimistic concurrency protocol that allows reads to proceed concurrently with
852    /// most write operations.
853    ///
854    /// ## Optimistic Concurrency Protocol
855    ///
856    /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
857    /// 2. **Execute read**: Run the provided function without any blocking
858    /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
859    /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
860    ///
861    /// ## How It Works
862    ///
863    /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
864    ///   coordinate through atomic compare-exchange operations that are linearizable
865    /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
866    ///   perform complex multi-step updates that require consistency
867    /// - **Early return optimization**: For operations that can short-circuit (like
868    ///   `contains()` returning true), we skip version validation as an optimization
869    ///
870    /// This pattern is essential for achieving lock-free performance while maintaining
871    /// correctness in the presence of concurrent modifications.
872    fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
873    where
874        F: Fn() -> T,
875        T: PartialEq,
876    {
877        if self.max_evictions == 0 {
878            return fun();
879        }
880        loop {
881            let lock = Lock::new(&self.lock, LockKind::Optimistic);
882            let result = fun();
883            if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
884                return result;
885            }
886        }
887    }
888}
889
890impl<H: Hasher + Default> Serialize for CuckooFilter<H> {
891    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
892    where
893        S: serde::Serializer,
894    {
895        self.to_serializable().serialize(serializer)
896    }
897}
898
899impl<'de, H> Deserialize<'de> for CuckooFilter<H>
900where
901    H: Hasher + Default,
902{
903    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
904    where
905        D: serde::Deserializer<'de>,
906    {
907        let serialized = SerializableFilter::deserialize(deserializer)?;
908        Self::from_serializable(serialized).map_err(|err| serde::de::Error::custom(err.to_string()))
909    }
910}
911
912impl CuckooFilter<DefaultHasher> {
913    /// Create a new CuckooFilterBuilder with default settings
914    pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
915        CuckooFilterBuilder::default()
916    }
917
918    /// Create a new CuckooFilter with default settings
919    pub fn new() -> CuckooFilter<DefaultHasher> {
920        Self::builder().build().unwrap()
921    }
922
923    /// Create a new CuckooFilter with the specified capacity
924    pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
925        Self::builder().capacity(capacity).build().unwrap()
926    }
927}
928
929impl Default for CuckooFilter<DefaultHasher> {
930    /// Create a new CuckooFilter with default settings
931    fn default() -> Self {
932        Self::new()
933    }
934}
935
936impl<H: Hasher + Default> CuckooFilterBuilder<H> {
937    /// Validate the builder configuration
938    fn validate(&self) -> Result<(), String> {
939        if let Some(fingerprint_size) = self.fingerprint_size
940            && ![4, 8, 16, 32].contains(&fingerprint_size)
941        {
942            return Err("Invalid fingerprint_size".into());
943        }
944        if self.bucket_size == Some(0) {
945            return Err("bucket_size must be greater than zero".into());
946        }
947        if self.capacity == Some(0) {
948            return Err("capacity must be greater than zero".into());
949        }
950        Ok(())
951    }
952
953    /// Build a CuckooFilter with the specified configuration
954    pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
955        let mut cuckoo_filter = self.base_build()?;
956        // Calculate the number of buckets (power of 2)
957        cuckoo_filter.num_buckets = cuckoo_filter
958            .capacity
959            .div_ceil(cuckoo_filter.bucket_size)
960            .next_power_of_two();
961        // Adjust the capacity to match the actual number of buckets
962        cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
963        // Calculate the fingerprint mask
964        cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
965        // Calculate the number of fingerprints per atomic value
966        cuckoo_filter.fingerprints_per_atomic =
967            usize::BITS as usize / cuckoo_filter.fingerprint_size;
968        // Calculate the total number of atomic values needed
969        let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
970        let atomic_size = bit_size.div_ceil(usize::BITS as usize);
971        // Initialize the buckets
972        cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
973        Ok(cuckoo_filter)
974    }
975}