atomic_cuckoo_filter/
lib.rs

1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use derive_builder::Builder;
7use rand::Rng;
8use std::collections::HashSet;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11use std::hint;
12use std::marker::PhantomData;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Maximum number of spin-loop iterations before parking a thread.
16/// This balances CPU usage vs. latency - spinning avoids kernel calls for
17/// short waits, but we park threads to avoid wasting CPU on long waits.
18const MAX_SPIN: usize = 100;
19
20/// Error type for Cuckoo Filter insert operation
21#[derive(Debug, thiserror::Error, PartialEq)]
22pub enum Error {
23    /// Returned when the filter is full and cannot accommodate more elements
24    #[error("Not enough space to store this item.")]
25    NotEnoughSpace,
26}
27
28/// Types of locks that can be acquired on the filter
29#[derive(PartialEq)]
30pub enum LockKind {
31    /// Optimistic version tracking - does not block other operations but captures
32    /// a version number to detect if data changed during the operation
33    Optimistic,
34    /// Exclusive among writers only - prevents other writers but allows concurrent readers
35    WriterExclusive,
36    /// Fully exclusive access - blocks all other operations (used only during evictions)
37    FullyExclusive,
38}
39
40/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
41///
42/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
43/// enables three distinct concurrency modes:
44///
45/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
46///    can proceed simultaneously. Used for optimistic reads that detect data races.
47///
48/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
49///    concurrent modifications but allows concurrent reads.
50///
51/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
52///    Only used during complex eviction chains to ensure consistency.
53///
54/// ## Version Encoding Scheme
55/// The atomic usize encodes both lock state and version information:
56/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
57/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
58///
59/// This allows optimistic readers to detect when their read might be stale by
60/// comparing version numbers before and after the operation.
61pub struct Lock<'a> {
62    /// Reference to the shared atomic value encoding lock state and version
63    atomic: &'a AtomicUsize,
64    /// Snapshot of the atomic value when this lock was acquired.
65    /// Used for optimistic concurrency control and version tracking.
66    /// The lower 2 bits indicate lock type, upper bits track version changes.
67    version: usize,
68    /// The type of lock held by this instance
69    kind: LockKind,
70    /// Counter for spin attempts before transitioning to thread parking.
71    /// Implements adaptive spinning to balance latency vs CPU usage.
72    retry: usize,
73}
74
75impl<'a> Lock<'a> {
76    /// Create a new lock of the specified kind
77    /// Blocks until the lock can be acquired
78    fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
79        let mut lock = Self {
80            atomic,
81            version: 0,
82            kind,
83            retry: 0,
84        };
85        match lock.kind {
86            LockKind::Optimistic => loop {
87                // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
88                lock.version = atomic.load(Ordering::Relaxed);
89                if Self::kind(lock.version) != LockKind::FullyExclusive {
90                    return lock;
91                }
92                lock.spin_or_park()
93            },
94            _ => loop {
95                // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
96                lock.version = atomic.load(Ordering::Relaxed);
97                if Self::kind(lock.version) != LockKind::Optimistic {
98                    lock.spin_or_park();
99                    continue;
100                }
101                // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
102                let new_version = if lock.kind == LockKind::WriterExclusive {
103                    lock.version + 1
104                } else {
105                    lock.version + 2
106                };
107                if atomic
108                    .compare_exchange_weak(
109                        lock.version,
110                        new_version,
111                        Ordering::Release,
112                        Ordering::Relaxed,
113                    )
114                    .is_ok()
115                {
116                    return lock;
117                }
118            },
119        }
120    }
121
122    /// Upgrade a WriterExclusive lock to a FullyExclusive lock
123    /// This assumes the current thread holds the writer exclusive lock.
124    fn upgrade(&mut self) {
125        self.atomic.store(self.version + 2, Ordering::Release);
126        self.kind = LockKind::FullyExclusive;
127    }
128
129    /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
130    /// Used for optimistic concurrency control
131    fn is_outdated(&self) -> bool {
132        let version = self.atomic.load(Ordering::Acquire);
133        Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
134    }
135
136    /// Get the key for parking a thread
137    /// Different keys are used for optimistic and exclusive locks
138    fn park_key(&self) -> usize {
139        let key = self.atomic.as_ptr() as usize;
140        match self.kind {
141            LockKind::Optimistic => key,
142            _ => key + 1,
143        }
144    }
145
146    /// Spin or park the thread when waiting for a lock
147    fn spin_or_park(&mut self) {
148        if self.retry > MAX_SPIN {
149            // After MAX_SPIN attempts, park the thread
150            self.retry = 0;
151            unsafe {
152                parking_lot_core::park(
153                    self.park_key(),
154                    || self.atomic.load(Ordering::Acquire) == self.version,
155                    || (),
156                    |_, _| (),
157                    parking_lot_core::DEFAULT_PARK_TOKEN,
158                    None,
159                );
160            }
161        } else {
162            // Otherwise, spin
163            self.retry += 1;
164            hint::spin_loop();
165        }
166    }
167
168    /// Extract the lock kind from the lower 2 bits of a version value
169    fn kind(version: usize) -> LockKind {
170        match version & 0b11 {
171            0 => LockKind::Optimistic,
172            1 => LockKind::WriterExclusive,
173            2 => LockKind::FullyExclusive,
174            _ => panic!("Invalid Lock"),
175        }
176    }
177}
178
179impl Drop for Lock<'_> {
180    /// Release the lock when it goes out of scope
181    fn drop(&mut self) {
182        match self.kind {
183            LockKind::Optimistic => return, // No need to do anything for Optimistic locks
184            LockKind::WriterExclusive => {
185                // For WriterExclusive locks, release the lock without incrementing the version
186                self.atomic.store(self.version, Ordering::Release);
187            }
188            LockKind::FullyExclusive => {
189                // For FullyExclusive locks, increment the version to invalidate Optimistic locks
190                self.atomic.store(self.version + 4, Ordering::Release);
191            }
192        }
193
194        // Unpark waiting threads
195        let optimistic_key = self.atomic.as_ptr() as usize;
196        let exclusive_key = optimistic_key + 1;
197        unsafe {
198            // Unpark all waiting optimistic locks
199            parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
200            // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
201            parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
202        }
203    }
204}
205
206/// A highly concurrent lock-free probabilistic data structure for set membership testing.
207///
208/// ## What Makes It "Cuckoo"
209///
210/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
211/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
212/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
213/// relocated to their alternate position, creating eviction chains.
214///
215/// ## Algorithm Overview
216///
217/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
218///    storing full keys, providing excellent space efficiency.
219///
220/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
221///    This provides better space efficiency and flexibility when inserting and removing items.
222///
223/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
224///    bucket and moved to its alternate location, potentially triggering a chain of evictions.
225///
226/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
227///    of traditional locks, with optimistic concurrency control for read operations.
228///    The only exception is when inserting with evictions, where a FullyExclusive lock is used
229///    to ensure consistency.
230///
231/// ## Key Advantages Over Bloom Filters
232///
233/// - **Deletions supported**: Items can be removed without false negatives
234/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
235/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
236/// - **High concurrency**: Lock-free design enables excellent parallel performance
237///
238/// ## Concurrency Model
239///
240/// - **Reads**: Optimistic, can proceed concurrently with most operations
241/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
242/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
243/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
244///
245/// ## Time Complexity
246///
247/// - **Lookup**: O(1)
248/// - **Deletion**: O(1)
249/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
250#[derive(Debug, Builder)]
251#[builder(
252    pattern = "owned",
253    build_fn(private, name = "base_build", validate = "Self::validate")
254)]
255pub struct CuckooFilter<H = DefaultHasher>
256where
257    H: Hasher + Default,
258{
259    // Configuration parameters
260    /// Maximum number of elements the filter can store
261    #[builder(default = "1048576")]
262    capacity: usize,
263
264    /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
265    #[builder(default = "16")]
266    fingerprint_size: usize,
267
268    /// Number of fingerprints per bucket
269    #[builder(default = "4")]
270    bucket_size: usize,
271
272    /// Maximum number of evictions to try before giving up
273    #[builder(default = "500")]
274    max_evictions: usize,
275
276    // Internal values - automatically derived from the configuration
277    /// Number of fingerprints that can be stored in a single atomic value
278    #[builder(setter(skip))]
279    fingerprints_per_atomic: usize,
280
281    /// Number of buckets in the filter (power of 2)
282    #[builder(setter(skip))]
283    num_buckets: usize,
284
285    /// Bit mask for extracting fingerprints
286    #[builder(setter(skip))]
287    fingerprint_mask: usize,
288
289    /// Storage for buckets, implemented as a vector of atomic values
290    #[builder(setter(skip))]
291    buckets: Vec<AtomicUsize>,
292
293    /// Atomic value used for locking
294    #[builder(setter(skip))]
295    lock: AtomicUsize,
296
297    /// Counter for the number of elements in the filter
298    #[builder(setter(skip))]
299    counter: AtomicUsize,
300
301    /// Phantom data for the hasher type
302    #[builder(setter(skip))]
303    _hasher: PhantomData<H>,
304}
305
306impl<H: Hasher + Default> CuckooFilter<H> {
307    /// Insert an item into the filter
308    ///
309    /// This operation first attempts a direct insertion without acquiring a lock.
310    /// If that fails due to bucket collisions, it falls back to the eviction-based
311    /// insertion algorithm which may require a write lock.
312    ///
313    /// Concurrent operations are safely handled through atomic operations.
314    ///
315    /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
316    pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
317        let (index, fingerprint) = self.index_and_fingerprint(item);
318        self.try_insert(index, fingerprint).or_else(|error| {
319            let lock = self.lock(LockKind::WriterExclusive).ok_or(error)?;
320            self.insert_with_evictions(index, fingerprint, lock)
321        })
322    }
323
324    /// Check if an item is in the filter and insert it if is not present (atomically)
325    ///
326    /// This method combines lookup and insert into a single atomic operation,
327    /// ensuring thread safety and consistency even with concurrent operations.
328    ///
329    /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
330    /// or Error::NotEnoughSpace if the filter is full
331    pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
332        let (index, fingerprint) = self.index_and_fingerprint(item);
333        if self.lookup_fingerprint(index, fingerprint).is_some() {
334            return Ok(false);
335        }
336        let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
337        if self.lookup_fingerprint(index, fingerprint).is_some() {
338            return Ok(false);
339        }
340        self.try_insert(index, fingerprint)
341            .or_else(|error| {
342                if self.max_evictions == 0 {
343                    return Err(error);
344                }
345                self.insert_with_evictions(index, fingerprint, lock)
346            })
347            .map(|_| true)
348    }
349
350    /// Counts the number of occurrences of an item in the filter.
351    ///
352    /// # Notes
353    /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
354    /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
355    /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
356    /// - This method may count false positives due to hash collisions.
357    pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
358        let (index, fingerprint) = self.index_and_fingerprint(item);
359        let alt_index = self.alt_index(index, fingerprint);
360        self.atomic_read(
361            || {
362                self.read_bucket(index, Ordering::Acquire)
363                    .filter(|&f| f == fingerprint)
364                    .count()
365                    + self
366                        .read_bucket(alt_index, Ordering::Acquire)
367                        .filter(|&f| f == fingerprint)
368                        .count()
369            },
370            None,
371        )
372    }
373
374    /// Attempts to remove an item from the filter.
375    ///
376    /// Returns `true` if the item was successfully removed, or `false` if it was not found.
377    ///
378    /// Note:
379    /// - An item should only be removed if it was previously added. Removing a non-existent
380    ///   item may inadvertently remove a different item due to hash collisions inherent to
381    ///   cuckoo filters.
382    pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
383        let (index, fingerprint) = self.index_and_fingerprint(item);
384        while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
385            let _lock = self.lock(LockKind::WriterExclusive);
386            if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
387                return true;
388            }
389        }
390        false
391    }
392
393    /// Check if an item is in the filter
394    ///
395    /// Returns `true` if the item is possibly in the filter (may have false positives),
396    /// `false` if it is definitely not in the filter
397    pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
398        let (index, fingerprint) = self.index_and_fingerprint(item);
399        self.atomic_read(
400            || self.lookup_fingerprint(index, fingerprint).is_some(),
401            Some(true),
402        )
403    }
404
405    /// Get the number of elements in the filter
406    pub fn len(&self) -> usize {
407        self.counter.load(Ordering::Acquire)
408    }
409
410    /// Check if the filter is empty
411    pub fn is_empty(&self) -> bool {
412        self.len() == 0
413    }
414
415    /// Get the capacity of the filter
416    pub fn capacity(&self) -> usize {
417        self.capacity
418    }
419
420    /// Clear the filter, removing all elements
421    pub fn clear(&self) {
422        let _lock = self.lock(LockKind::WriterExclusive);
423        for atomic in &self.buckets {
424            let old_value = atomic.swap(0, Ordering::Release);
425            let removed = (0..self.fingerprints_per_atomic)
426                .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
427                .count();
428            if removed > 0 {
429                self.counter.fetch_sub(removed, Ordering::Release);
430            }
431        }
432    }
433
434    /// Compute the hash of an item
435    /// Uses the generic hasher H for flexibility and performance
436    fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
437        let mut hasher = <H as Default>::default();
438        data.hash(&mut hasher);
439        hasher.finish()
440    }
441
442    /// Compute the bucket index and fingerprint for an item.
443    ///
444    /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
445    /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
446    ///    distribution across the fingerprint space, then add 1 to avoid zero.
447    /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
448    ///    is always a power of 2, providing perfect hash distribution.
449    ///
450    /// ## Why This Design
451    ///
452    /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
453    ///   so 0 can represent empty slots without ambiguity
454    /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
455    ///   different bits via multiplication, avoiding correlation
456    /// - **Uniform distribution**: Both index and fingerprint are uniformly
457    ///   distributed across their respective ranges
458    ///
459    /// Returns (index, fingerprint) where:
460    /// - index is the primary bucket index (0 to num_buckets-1)  
461    /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
462    fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
463        let hash = self.hash(item);
464        // Compute fingerprint using multiplication and shift for better distribution
465        let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
466        // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
467        let index = hash as usize & (self.num_buckets - 1);
468        (index, fingerprint as usize)
469    }
470
471    /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
472    ///
473    /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
474    /// deterministically computes the alternate bucket index from the current index and fingerprint.
475    ///
476    /// Properties:
477    /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
478    /// 2. Distinctness: For any fingerprint, the two indices are always different.
479    /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
480    fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
481        index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
482    }
483
484    /// Look up a fingerprint at its primary or alternative index
485    /// Returns `Some((index, sub_index))` if found, None otherwise
486    fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
487        // First check the primary bucket
488        self.read_bucket(index, Ordering::Acquire)
489            .position(|fp| fp == fingerprint)
490            .map(|sub_index| (index, sub_index))
491            .or_else(|| {
492                // Then check the alternative bucket
493                let alt_index = self.alt_index(index, fingerprint);
494                self.read_bucket(alt_index, Ordering::Acquire)
495                    .position(|fp| fp == fingerprint)
496                    .map(|sub_index| (alt_index, sub_index))
497            })
498    }
499
500    /// Try to insert a fingerprint at its primary or alternative index
501    /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
502    fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
503        self.insert_at_index(index, fingerprint).or_else(|_| {
504            let alt_index = self.alt_index(index, fingerprint);
505            self.insert_at_index(alt_index, fingerprint)
506        })
507    }
508
509    /// Try to insert a fingerprint at a specific index
510    /// Returns Ok(()) if successful, Err(Error::NotEnoughSpace) if the bucket is full
511    fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
512        loop {
513            let sub_index = self
514                .read_bucket(index, Ordering::Relaxed)
515                .position(|i| i == 0)
516                .ok_or(Error::NotEnoughSpace)?;
517
518            if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
519                return Ok(());
520            }
521        }
522    }
523
524    /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
525    ///
526    /// This method is invoked only as a fallback when direct insertion fails, preserving
527    /// the optimistic, lock-free fast path for the common case.
528    ///
529    /// # Cuckoo Eviction Algorithm
530    ///
531    /// When both possible locations for an item are full:
532    /// 1. **Randomly select** an existing item from one of the full buckets
533    /// 2. **Evict** that item and insert our new item in its place
534    /// 3. **Relocate** the evicted item to its alternate location
535    /// 4. **Repeat** if the alternate location is also full (eviction chain)
536    /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
537    ///
538    /// # Implementation Details
539    ///
540    /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
541    ///   atomically applied only if the chain succeeds, ensuring atomicity and consistency.
542    /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
543    ///   `FullyExclusive` only when actually applying the eviction chain, maximizing
544    ///   read concurrency during planning.
545    /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
546    ///   in each bucket, to ensure early detection of loops in eviction chains.
547    fn insert_with_evictions(
548        &self,
549        mut index: usize,
550        mut fingerprint: usize,
551        mut lock: Lock,
552    ) -> Result<(), Error> {
553        let mut rng = rand::rng();
554        let mut insertions = Vec::with_capacity(self.max_evictions.min(32));
555        let mut used_slots = HashSet::with_capacity(self.max_evictions.min(32));
556        while insertions.len() <= self.max_evictions {
557            // Choose a sub-index in this bucket whose global slot has not been used yet in the plan
558            let base_slot = index * self.bucket_size;
559            let mut sub_index = rng.random_range(0..self.bucket_size);
560            if used_slots.contains(&(base_slot + sub_index)) {
561                sub_index = (0..self.bucket_size)
562                    .find(|&i| !used_slots.contains(&(base_slot + i)))
563                    .ok_or(Error::NotEnoughSpace)?;
564            }
565            used_slots.insert(base_slot + sub_index);
566            insertions.push((index, sub_index, fingerprint));
567
568            // Evict the fingerprint at the chosen sub-index
569            fingerprint = self
570                .read_bucket(index, Ordering::Relaxed)
571                .nth(sub_index)
572                .unwrap();
573            // Find the alternative index for the evicted fingerprint
574            index = self.alt_index(index, fingerprint);
575
576            if self.insert_at_index(index, fingerprint).is_ok() {
577                // Successfully inserted the fingerprint, now apply all evictions
578                lock.upgrade();
579                let mut evicted = fingerprint;
580                while let Some((index, sub_index, fingerprint)) = insertions.pop() {
581                    self.update_bucket(index, sub_index, evicted, fingerprint, Ordering::Relaxed);
582                    evicted = fingerprint;
583                }
584                return Ok(());
585            }
586        }
587        // Reached the maximum number of evictions, give up
588        Err(Error::NotEnoughSpace)
589    }
590
591    /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
592    ///
593    /// ## Memory Layout Complexity
594    ///
595    /// Fingerprints are tightly packed in memory across multiple atomic usize values:
596    /// - Each bucket contains `bucket_size` fingerprints
597    /// - Each fingerprint is `fingerprint_size` bits
598    /// - Multiple fingerprints are packed into each atomic usize
599    /// - Buckets may span across multiple atomic values
600    ///
601    /// ## Algorithm Steps
602    ///
603    /// 1. Calculate which atomic values contain this bucket's data
604    /// 2. Atomically load each relevant atomic value (using Acquire ordering)
605    /// 3. Extract fingerprints using bit manipulation and masking
606    /// 4. Handle boundary cases where buckets span multiple atomics
607    /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
608    ///
609    /// This is completely lock-free - multiple threads can read concurrently,
610    /// and reads can proceed even during writes (though they might see
611    /// intermediate states that get resolved by retry logic).
612    ///
613    /// Returns an Iterator over the fingerprints in the bucket, (0 = empty slot).
614    fn read_bucket(&self, index: usize, ordering: Ordering) -> impl Iterator<Item = usize> {
615        let fingerprint_index = index * self.bucket_size;
616        let bit_index = fingerprint_index * self.fingerprint_size;
617        let start_index = bit_index / usize::BITS as usize;
618        let skip_bits = bit_index % usize::BITS as usize;
619        let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
620        // No need to calculate end_index; just iterate from start_index to the end of the bucket
621        self.buckets[start_index..]
622            .iter()
623            .flat_map(move |atomic| {
624                let atomic_value = atomic.load(ordering);
625                (0..self.fingerprints_per_atomic).map(move |i| {
626                    (atomic_value
627                        >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
628                        & self.fingerprint_mask
629                })
630            })
631            .skip(skip_fingerprints)
632            .take(self.bucket_size)
633    }
634
635    /// Atomically update a single fingerprint using lock-free compare-exchange.
636    ///
637    /// ## Lock-Free Update Algorithm
638    ///
639    /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
640    ///    and the exact bit position within that atomic value
641    /// 2. **Read current state**: Load the current atomic value
642    /// 3. **Verify expectation**: Check that the target position contains `old_value`
643    /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
644    ///    with `new_value`, but only if the atomic hasn't changed since step 2
645    /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
646    ///    restart from step 2
647    ///
648    /// ## Concurrency Safety
649    ///
650    /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
651    ///   but is more efficient than the strong version
652    /// - Employs Release ordering on success to ensure other threads see the change
653    /// - Updates the global counter atomically to maintain consistency
654    /// - Returns false if the expected `old_value` is no longer present (indicating
655    ///   another thread already modified this slot)
656    ///
657    /// Returns `true` if update succeeded, `false` if the slot no longer contains
658    /// the expected `old_value` due to concurrent modification.
659    fn update_bucket(
660        &self,
661        index: usize,
662        sub_index: usize,
663        old_value: usize,
664        new_value: usize,
665        ordering: Ordering,
666    ) -> bool {
667        let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
668        let atomic_index = bit_index / usize::BITS as usize;
669        let skip_bits = bit_index % usize::BITS as usize;
670        let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
671        let fingerprint_mask = self.fingerprint_mask << shift;
672        let atomic = &self.buckets[atomic_index];
673
674        loop {
675            let atomic_value = atomic.load(Ordering::Relaxed);
676            if (atomic_value & fingerprint_mask) >> shift != old_value {
677                // The expected fingerprint is not present in the atomic value
678                return false;
679            }
680            let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
681            if atomic
682                .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
683                .is_ok()
684            {
685                // Update the counter based on the change
686                match (old_value, new_value) {
687                    (0, _) => self.counter.fetch_add(1, Ordering::Release),
688                    (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
689                    (_, _) => 0,
690                };
691                return true;
692            }
693        }
694    }
695
696    /// Acquires a lock on the filter, if necessary.
697    ///
698    /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
699    /// If `max_evictions` is set to 0, no lock is acquired.
700    ///
701    /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
702    pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
703        if self.max_evictions == 0 {
704            None
705        } else {
706            Some(Lock::new(&self.lock, kind))
707        }
708    }
709
710    /// Execute a read operation with optimistic concurrency control and automatic retry.
711    ///
712    /// This is the cornerstone of the lock-free design, implementing a sophisticated
713    /// optimistic concurrency protocol that allows reads to proceed concurrently with
714    /// most write operations.
715    ///
716    /// ## Optimistic Concurrency Protocol
717    ///
718    /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
719    /// 2. **Execute read**: Run the provided function without any blocking
720    /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
721    /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
722    ///
723    /// ## How It Works
724    ///
725    /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
726    ///   coordinate through atomic compare-exchange operations that are linearizable
727    /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
728    ///   perform complex multi-step updates that require consistency
729    /// - **Early return optimization**: For operations that can short-circuit (like
730    ///   `contains()` returning true), we skip version validation as an optimization
731    ///
732    /// This pattern is essential for achieving lock-free performance while maintaining
733    /// correctness in the presence of concurrent modifications.
734    fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
735    where
736        F: Fn() -> T,
737        T: PartialEq,
738    {
739        if self.max_evictions == 0 {
740            return fun();
741        }
742        loop {
743            let lock = Lock::new(&self.lock, LockKind::Optimistic);
744            let result = fun();
745            if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
746                return result;
747            }
748        }
749    }
750}
751
752impl CuckooFilter<DefaultHasher> {
753    /// Create a new CuckooFilterBuilder with default settings
754    pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
755        CuckooFilterBuilder::default()
756    }
757
758    /// Create a new CuckooFilter with default settings
759    pub fn new() -> CuckooFilter<DefaultHasher> {
760        Self::builder().build().unwrap()
761    }
762
763    /// Create a new CuckooFilter with the specified capacity
764    pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
765        Self::builder().capacity(capacity).build().unwrap()
766    }
767}
768
769impl Default for CuckooFilter<DefaultHasher> {
770    /// Create a new CuckooFilter with default settings
771    fn default() -> Self {
772        Self::new()
773    }
774}
775
776impl<H: Hasher + Default> CuckooFilterBuilder<H> {
777    /// Validate the builder configuration
778    fn validate(&self) -> Result<(), String> {
779        if let Some(fingerprint_size) = self.fingerprint_size
780            && ![4, 8, 16, 32].contains(&fingerprint_size)
781        {
782            return Err("Invalid fingerprint_size".into());
783        }
784        if self.bucket_size == Some(0) {
785            return Err("bucket_size must be greater than zero".into());
786        }
787        if self.capacity == Some(0) {
788            return Err("capacity must be greater than zero".into());
789        }
790        Ok(())
791    }
792
793    /// Build a CuckooFilter with the specified configuration
794    pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
795        let mut cuckoo_filter = self.base_build()?;
796        // Calculate the number of buckets (power of 2)
797        cuckoo_filter.num_buckets = cuckoo_filter
798            .capacity
799            .div_ceil(cuckoo_filter.bucket_size)
800            .next_power_of_two();
801        // Adjust the capacity to match the actual number of buckets
802        cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
803        // Calculate the fingerprint mask
804        cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
805        // Calculate the number of fingerprints per atomic value
806        cuckoo_filter.fingerprints_per_atomic =
807            usize::BITS as usize / cuckoo_filter.fingerprint_size;
808        // Calculate the total number of atomic values needed
809        let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
810        let atomic_size = bit_size.div_ceil(usize::BITS as usize);
811        // Initialize the buckets
812        cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
813        Ok(cuckoo_filter)
814    }
815}