atomic_cuckoo_filter/
lib.rs

1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use derive_builder::Builder;
7use rand::Rng;
8use std::collections::HashMap;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11use std::hint;
12use std::marker::PhantomData;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Maximum number of spin-loop iterations before parking a thread.
16/// This balances CPU usage vs. latency - spinning avoids kernel calls for
17/// short waits, but we park threads to avoid wasting CPU on long waits.
18const MAX_SPIN: usize = 100;
19
20/// Error type for Cuckoo Filter insert operation
21#[derive(Debug, thiserror::Error, PartialEq)]
22pub enum Error {
23    /// Returned when the filter is full and cannot accommodate more elements
24    #[error("Not enough space to store this item.")]
25    NotEnoughSpace,
26}
27
28/// Types of locks that can be acquired on the filter
29#[derive(PartialEq)]
30pub enum LockKind {
31    /// Optimistic version tracking - does not block other operations but captures
32    /// a version number to detect if data changed during the operation
33    Optimistic,
34    /// Exclusive among writers only - prevents other writers but allows concurrent readers
35    WriterExclusive,
36    /// Fully exclusive access - blocks all other operations (used only during evictions)
37    FullyExclusive,
38}
39
40/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
41///
42/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
43/// enables three distinct concurrency modes:
44///
45/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
46///    can proceed simultaneously. Used for optimistic reads that detect data races.
47///
48/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
49///    concurrent modifications but allows concurrent reads.
50///
51/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
52///    Only used during complex eviction chains to ensure consistency.
53///
54/// ## Version Encoding Scheme
55/// The atomic usize encodes both lock state and version information:
56/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
57/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
58///
59/// This allows optimistic readers to detect when their read might be stale by
60/// comparing version numbers before and after the operation.
61pub struct Lock<'a> {
62    /// Reference to the shared atomic value encoding lock state and version
63    atomic: &'a AtomicUsize,
64    /// Snapshot of the atomic value when this lock was acquired.
65    /// Used for optimistic concurrency control and version tracking.
66    /// The lower 2 bits indicate lock type, upper bits track version changes.
67    version: usize,
68    /// The type of lock held by this instance
69    kind: LockKind,
70    /// Counter for spin attempts before transitioning to thread parking.
71    /// Implements adaptive spinning to balance latency vs CPU usage.
72    retry: usize,
73}
74
75impl<'a> Lock<'a> {
76    /// Create a new lock of the specified kind
77    /// Blocks until the lock can be acquired
78    fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
79        let mut lock = Self {
80            atomic,
81            version: 0,
82            kind,
83            retry: 0,
84        };
85        match lock.kind {
86            LockKind::Optimistic => loop {
87                // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
88                lock.version = atomic.load(Ordering::Relaxed);
89                if Self::kind(lock.version) != LockKind::FullyExclusive {
90                    return lock;
91                }
92                lock.spin_or_park()
93            },
94            _ => loop {
95                // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
96                lock.version = atomic.load(Ordering::Relaxed);
97                if Self::kind(lock.version) != LockKind::Optimistic {
98                    lock.spin_or_park();
99                    continue;
100                }
101                // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
102                let new_version = if lock.kind == LockKind::WriterExclusive {
103                    lock.version + 1
104                } else {
105                    lock.version + 2
106                };
107                if atomic
108                    .compare_exchange_weak(
109                        lock.version,
110                        new_version,
111                        Ordering::Release,
112                        Ordering::Relaxed,
113                    )
114                    .is_ok()
115                {
116                    return lock;
117                }
118            },
119        }
120    }
121
122    /// Upgrade a WriterExclusive lock to a FullyExclusive lock
123    /// This assumes the current thread holds the writer exclusive lock.
124    fn upgrade(&mut self) {
125        self.atomic.store(self.version + 2, Ordering::Release);
126        self.kind = LockKind::FullyExclusive;
127    }
128
129    /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
130    /// Used for optimistic concurrency control
131    fn is_outdated(&self) -> bool {
132        let version = self.atomic.load(Ordering::Acquire);
133        Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
134    }
135
136    /// Get the key for parking a thread
137    /// Different keys are used for optimistic and exclusive locks
138    fn park_key(&self) -> usize {
139        let key = self.atomic.as_ptr() as usize;
140        match self.kind {
141            LockKind::Optimistic => key,
142            _ => key + 1,
143        }
144    }
145
146    /// Spin or park the thread when waiting for a lock
147    fn spin_or_park(&mut self) {
148        if self.retry > MAX_SPIN {
149            // After MAX_SPIN attempts, park the thread
150            self.retry = 0;
151            unsafe {
152                parking_lot_core::park(
153                    self.park_key(),
154                    || self.atomic.load(Ordering::Acquire) == self.version,
155                    || (),
156                    |_, _| (),
157                    parking_lot_core::DEFAULT_PARK_TOKEN,
158                    None,
159                );
160            }
161        } else {
162            // Otherwise, spin
163            self.retry += 1;
164            hint::spin_loop();
165        }
166    }
167
168    /// Extract the lock kind from the lower 2 bits of a version value
169    fn kind(version: usize) -> LockKind {
170        match version & 0b11 {
171            0 => LockKind::Optimistic,
172            1 => LockKind::WriterExclusive,
173            2 => LockKind::FullyExclusive,
174            _ => panic!("Invalid Lock"),
175        }
176    }
177}
178
179impl Drop for Lock<'_> {
180    /// Release the lock when it goes out of scope
181    fn drop(&mut self) {
182        match self.kind {
183            LockKind::Optimistic => return, // No need to do anything for Optimistic locks
184            LockKind::WriterExclusive => {
185                // For WriterExclusive locks, release the lock without incrementing the version
186                self.atomic.store(self.version, Ordering::Release);
187            }
188            LockKind::FullyExclusive => {
189                // For FullyExclusive locks, increment the version to invalidate Optimistic locks
190                self.atomic.store(self.version + 4, Ordering::Release);
191            }
192        }
193
194        // Unpark waiting threads
195        let optimistic_key = self.atomic.as_ptr() as usize;
196        let exclusive_key = optimistic_key + 1;
197        unsafe {
198            // Unpark all waiting optimistic locks
199            parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
200            // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
201            parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
202        }
203    }
204}
205
206/// A highly concurrent lock-free probabilistic data structure for set membership testing.
207///
208/// ## What Makes It "Cuckoo"
209///
210/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
211/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
212/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
213/// relocated to their alternate position, creating eviction chains.
214///
215/// ## Algorithm Overview
216///
217/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
218///    storing full keys, providing excellent space efficiency.
219///
220/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
221///    This provides better space efficiency and flexibility when inserting and removing items.
222///
223/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
224///    bucket and moved to its alternate location, potentially triggering a chain of evictions.
225///
226/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
227///    of traditional locks, with optimistic concurrency control for read operations.
228///    The only exception is when inserting with evictions, where a FullyExclusive lock is used
229///    to ensure consistency.
230///
231/// ## Key Advantages Over Bloom Filters
232///
233/// - **Deletions supported**: Items can be removed without false negatives
234/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
235/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
236/// - **High concurrency**: Lock-free design enables excellent parallel performance
237///
238/// ## Concurrency Model
239///
240/// - **Reads**: Optimistic, can proceed concurrently with most operations
241/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
242/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
243/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
244///
245/// ## Time Complexity
246///
247/// - **Lookup**: O(1)
248/// - **Deletion**: O(1)
249/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
250#[derive(Debug, Builder)]
251#[builder(
252    pattern = "owned",
253    build_fn(private, name = "base_build", validate = "Self::validate")
254)]
255pub struct CuckooFilter<H = DefaultHasher>
256where
257    H: Hasher + Default,
258{
259    // Configuration parameters
260    /// Maximum number of elements the filter can store
261    #[builder(default = "1048576")]
262    capacity: usize,
263
264    /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
265    #[builder(default = "16")]
266    fingerprint_size: usize,
267
268    /// Number of fingerprints per bucket
269    #[builder(default = "4")]
270    bucket_size: usize,
271
272    /// Maximum number of evictions to try before giving up
273    #[builder(default = "500")]
274    max_evictions: usize,
275
276    // Internal values - automatically derived from the configuration
277    /// Number of fingerprints that can be stored in a single atomic value
278    #[builder(setter(skip))]
279    fingerprints_per_atomic: usize,
280
281    /// Number of buckets in the filter (power of 2)
282    #[builder(setter(skip))]
283    num_buckets: usize,
284
285    /// Number of atomic values per bucket
286    #[builder(setter(skip))]
287    atomics_per_bucket: usize,
288
289    /// Bit mask for extracting fingerprints
290    #[builder(setter(skip))]
291    fingerprint_mask: usize,
292
293    /// Storage for buckets, implemented as a vector of atomic values
294    #[builder(setter(skip))]
295    buckets: Vec<AtomicUsize>,
296
297    /// Atomic value used for locking
298    #[builder(setter(skip))]
299    lock: AtomicUsize,
300
301    /// Counter for the number of elements in the filter
302    #[builder(setter(skip))]
303    counter: AtomicUsize,
304
305    /// Phantom data for the hasher type
306    #[builder(setter(skip))]
307    _hasher: PhantomData<H>,
308}
309
310impl<H: Hasher + Default> CuckooFilter<H> {
311    /// Insert an item into the filter
312    ///
313    /// This operation first attempts a direct insertion without acquiring a lock.
314    /// If that fails due to bucket collisions, it falls back to the eviction-based
315    /// insertion algorithm which may require a write lock.
316    ///
317    /// Concurrent operations are safely handled through atomic operations.
318    ///
319    /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
320    pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
321        let (index, fingerprint) = self.index_and_fingerprint(item);
322        self.try_insert(index, fingerprint).or_else(|error| {
323            if let Some(lock) = self.lock(LockKind::WriterExclusive) {
324                self.insert_with_evictions(index, fingerprint, lock)
325            } else {
326                Err(error)
327            }
328        })
329    }
330
331    /// Check if an item is in the filter and insert it if is not present (atomically)
332    ///
333    /// This method combines lookup and insert into a single atomic operation,
334    /// ensuring thread safety and consistency even with concurrent operations.
335    ///
336    /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
337    /// or Error::NotEnoughSpace if the filter is full
338    pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
339        let (index, fingerprint) = self.index_and_fingerprint(item);
340        if self.lookup_fingerprint(index, fingerprint).is_some() {
341            return Ok(false);
342        }
343        let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
344        if self.lookup_fingerprint(index, fingerprint).is_some() {
345            return Ok(false);
346        }
347        self.try_insert(index, fingerprint)
348            .or_else(|error| {
349                if self.max_evictions == 0 {
350                    return Err(error);
351                }
352                self.insert_with_evictions(index, fingerprint, lock)
353            })
354            .map(|_| true)
355    }
356
357    /// Counts the number of occurrences of an item in the filter.
358    ///
359    /// # Notes
360    /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
361    /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
362    /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
363    /// - This method may count false positives due to hash collisions.
364    pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
365        let (index, fingerprint) = self.index_and_fingerprint(item);
366        let alt_index = self.alt_index(index, fingerprint);
367        self.atomic_read(
368            || {
369                self.read_bucket(index, Ordering::Acquire)
370                    .into_iter()
371                    .filter(|&f| f == fingerprint)
372                    .count()
373                    + self
374                        .read_bucket(alt_index, Ordering::Acquire)
375                        .into_iter()
376                        .filter(|&f| f == fingerprint)
377                        .count()
378            },
379            None,
380        )
381    }
382
383    /// Attempts to remove an item from the filter.
384    ///
385    /// Returns `true` if the item was successfully removed, or `false` if it was not found.
386    ///
387    /// Note:
388    /// - An item should only be removed if it was previously added. Removing a non-existent
389    ///   item may inadvertently remove a different item due to hash collisions inherent to
390    ///   cuckoo filters.
391    pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
392        let (index, fingerprint) = self.index_and_fingerprint(item);
393        while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
394            let _lock = self.lock(LockKind::WriterExclusive);
395            if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
396                return true;
397            }
398        }
399        false
400    }
401
402    /// Check if an item is in the filter
403    ///
404    /// Returns `true` if the item is possibly in the filter (may have false positives),
405    /// `false` if it is definitely not in the filter
406    pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
407        let (index, fingerprint) = self.index_and_fingerprint(item);
408        self.atomic_read(
409            || self.lookup_fingerprint(index, fingerprint).is_some(),
410            Some(true),
411        )
412    }
413
414    /// Get the number of elements in the filter
415    pub fn len(&self) -> usize {
416        self.counter.load(Ordering::Acquire)
417    }
418
419    /// Check if the filter is empty
420    pub fn is_empty(&self) -> bool {
421        self.len() == 0
422    }
423
424    /// Get the capacity of the filter
425    pub fn capacity(&self) -> usize {
426        self.capacity
427    }
428
429    /// Clear the filter, removing all elements
430    pub fn clear(&self) {
431        let _lock = self.lock(LockKind::WriterExclusive);
432        for atomic in &self.buckets {
433            let old_value = atomic.swap(0, Ordering::Release);
434            let removed = (0..self.fingerprints_per_atomic)
435                .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
436                .count();
437            if removed > 0 {
438                self.counter.fetch_sub(removed, Ordering::Release);
439            }
440        }
441    }
442
443    /// Compute the hash of an item
444    /// Uses the generic hasher H for flexibility and performance
445    fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
446        let mut hasher = <H as Default>::default();
447        data.hash(&mut hasher);
448        hasher.finish()
449    }
450
451    /// Compute the bucket index and fingerprint for an item.
452    ///
453    /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
454    /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
455    ///    distribution across the fingerprint space, then add 1 to avoid zero.
456    /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
457    ///    is always a power of 2, providing perfect hash distribution.
458    ///
459    /// ## Why This Design
460    ///
461    /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
462    ///   so 0 can represent empty slots without ambiguity
463    /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
464    ///   different bits via multiplication, avoiding correlation
465    /// - **Uniform distribution**: Both index and fingerprint are uniformly
466    ///   distributed across their respective ranges
467    ///
468    /// Returns (index, fingerprint) where:
469    /// - index is the primary bucket index (0 to num_buckets-1)  
470    /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
471    fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
472        let hash = self.hash(item);
473        // Compute fingerprint using multiplication and shift for better distribution
474        let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
475        // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
476        let index = hash as usize & (self.num_buckets - 1);
477        (index, fingerprint as usize)
478    }
479
480    /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
481    ///
482    /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
483    /// deterministically computes the alternate bucket index from the current index and fingerprint.
484    ///
485    /// Properties:
486    /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
487    /// 2. Distinctness: For any fingerprint, the two indices are always different.
488    /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
489    fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
490        index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
491    }
492
493    /// Look up a fingerprint at its primary or alternative index
494    /// Returns `Some((index, sub_index))` if found, None otherwise
495    fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
496        // First check the primary bucket
497        self.read_bucket(index, Ordering::Acquire)
498            .iter()
499            .position(|fp| fp == &fingerprint)
500            .map(|sub_index| (index, sub_index))
501            .or_else(|| {
502                // Then check the alternative bucket
503                let alt_index = self.alt_index(index, fingerprint);
504                self.read_bucket(alt_index, Ordering::Acquire)
505                    .iter()
506                    .position(|fp| fp == &fingerprint)
507                    .map(|sub_index| (alt_index, sub_index))
508            })
509    }
510
511    /// Try to insert a fingerprint at its primary or alternative index
512    /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
513    fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
514        self.insert_at_index(index, fingerprint).or_else(|_| {
515            let alt_index = self.alt_index(index, fingerprint);
516            self.insert_at_index(alt_index, fingerprint)
517                .map_err(|_| Error::NotEnoughSpace)
518        })
519    }
520
521    /// Try to insert a fingerprint at a specific index
522    /// Returns `Ok(())` if successful, `Err(bucket)` if the bucket is full
523    fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Vec<usize>> {
524        loop {
525            let bucket = self.read_bucket(index, Ordering::Relaxed);
526            if let Some(sub_index) = bucket.iter().position(|&i| i == 0) {
527                if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
528                    return Ok(());
529                } else {
530                    continue;
531                }
532            } else {
533                return Err(bucket);
534            }
535        }
536    }
537
538    /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
539    ///
540    /// This method is invoked only as a fallback when direct insertion fails, preserving
541    /// the optimistic, lock-free fast path for the common case.
542    ///
543    /// # Cuckoo Eviction Algorithm
544    ///
545    /// When both possible locations for an item are full:
546    /// 1. **Randomly select** an existing item from one of the full buckets
547    /// 2. **Evict** that item and insert our new item in its place
548    /// 3. **Relocate** the evicted item to its alternate location
549    /// 4. **Repeat** if the alternate location is also full (eviction chain)
550    /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
551    ///
552    /// # Implementation Details
553    ///
554    /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
555    ///   atomically applied only if the chain succeeds, ensuring atomicity and consistency.
556    /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
557    ///   `FullyExclusive` only when actually applying the eviction chain, maximizing
558    ///   read concurrency during planning.
559    /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
560    ///   in each bucket, to ensure early detection of loops in eviction chains.
561    fn insert_with_evictions(
562        &self,
563        mut index: usize,
564        mut fingerprint: usize,
565        mut lock: Lock,
566    ) -> Result<(), Error> {
567        let mut rng = rand::rng();
568        let mut evictions = Vec::with_capacity(self.max_evictions.min(32));
569        let mut used_indices = HashMap::with_capacity(self.max_evictions.min(32));
570        while evictions.len() <= self.max_evictions {
571            if let Err(bucket) = self.insert_at_index(index, fingerprint) {
572                let sub_index = match used_indices.entry(index).or_insert(0usize) {
573                    sub_indices if *sub_indices == 0 => {
574                        // First time seeing this index, randomly choose a sub-index
575                        let sub_index = rng.random_range(0..self.bucket_size);
576                        *sub_indices = 1 << sub_index;
577                        sub_index
578                    }
579                    sub_indices => {
580                        // Find an unused sub-index
581                        if let Some(sub_index) =
582                            (0..self.bucket_size).find(|shift| (*sub_indices >> shift) & 1 == 0)
583                        {
584                            *sub_indices |= 1 << sub_index;
585                            sub_index
586                        } else {
587                            return Err(Error::NotEnoughSpace);
588                        }
589                    }
590                };
591                // Evict the fingerprint at the chosen sub-index
592                let evicted = bucket[sub_index];
593                evictions.push((index, sub_index, fingerprint));
594                // Find the alternative index for the evicted fingerprint
595                index = self.alt_index(index, evicted);
596                fingerprint = evicted;
597            } else {
598                // Successfully inserted the fingerprint, now apply all evictions
599                lock.upgrade();
600                while let Some((index, sub_index, evicted)) = evictions.pop() {
601                    self.update_bucket(index, sub_index, fingerprint, evicted, Ordering::Relaxed);
602                    fingerprint = evicted;
603                }
604                return Ok(());
605            }
606        }
607        // Reached the maximum number of evictions, give up
608        Err(Error::NotEnoughSpace)
609    }
610
611    /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
612    ///
613    /// ## Memory Layout Complexity
614    ///
615    /// Fingerprints are tightly packed in memory across multiple atomic usize values:
616    /// - Each bucket contains `bucket_size` fingerprints
617    /// - Each fingerprint is `fingerprint_size` bits
618    /// - Multiple fingerprints are packed into each atomic usize
619    /// - Buckets may span across multiple atomic values
620    ///
621    /// ## Algorithm Steps
622    ///
623    /// 1. Calculate which atomic values contain this bucket's data
624    /// 2. Atomically load each relevant atomic value (using Acquire ordering)
625    /// 3. Extract fingerprints using bit manipulation and masking
626    /// 4. Handle boundary cases where buckets span multiple atomics
627    /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
628    ///
629    /// This is completely lock-free - multiple threads can read concurrently,
630    /// and reads can proceed even during writes (though they might see
631    /// intermediate states that get resolved by retry logic).
632    ///
633    /// Returns a vector containing all fingerprints in the bucket (0 = empty slot)
634    fn read_bucket(&self, index: usize, ordering: Ordering) -> Vec<usize> {
635        let fingerprint_index = index * self.bucket_size;
636        let bit_index = fingerprint_index * self.fingerprint_size;
637        let start_index = bit_index / usize::BITS as usize;
638        let skip_bits = bit_index % usize::BITS as usize;
639        let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
640        let end_index = start_index + self.atomics_per_bucket;
641
642        self.buckets[start_index..end_index]
643            .iter()
644            .flat_map(|atomic| {
645                let atomic_value = atomic.load(ordering);
646                (0..self.fingerprints_per_atomic).map(move |i| {
647                    (atomic_value
648                        >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
649                        & self.fingerprint_mask
650                })
651            })
652            .skip(skip_fingerprints)
653            .take(self.bucket_size)
654            .collect()
655    }
656
657    /// Atomically update a single fingerprint using lock-free compare-exchange.
658    ///
659    /// ## Lock-Free Update Algorithm
660    ///
661    /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
662    ///    and the exact bit position within that atomic value
663    /// 2. **Read current state**: Load the current atomic value
664    /// 3. **Verify expectation**: Check that the target position contains `old_value`
665    /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
666    ///    with `new_value`, but only if the atomic hasn't changed since step 2
667    /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
668    ///    restart from step 2
669    ///
670    /// ## Concurrency Safety
671    ///
672    /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
673    ///   but is more efficient than the strong version
674    /// - Employs Release ordering on success to ensure other threads see the change
675    /// - Updates the global counter atomically to maintain consistency
676    /// - Returns false if the expected `old_value` is no longer present (indicating
677    ///   another thread already modified this slot)
678    ///
679    /// Returns `true` if update succeeded, `false` if the slot no longer contains
680    /// the expected `old_value` due to concurrent modification.
681    fn update_bucket(
682        &self,
683        index: usize,
684        sub_index: usize,
685        old_value: usize,
686        new_value: usize,
687        ordering: Ordering,
688    ) -> bool {
689        let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
690        let atomic_index = bit_index / usize::BITS as usize;
691        let skip_bits = bit_index % usize::BITS as usize;
692        let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
693        let fingerprint_mask = self.fingerprint_mask << shift;
694        let atomic = &self.buckets[atomic_index];
695
696        loop {
697            let atomic_value = atomic.load(Ordering::Relaxed);
698            if (atomic_value & fingerprint_mask) >> shift != old_value {
699                // The expected fingerprint is not present in the atomic value
700                return false;
701            }
702            let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
703            if atomic
704                .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
705                .is_ok()
706            {
707                // Update the counter based on the change
708                match (old_value, new_value) {
709                    (0, _) => self.counter.fetch_add(1, Ordering::Release),
710                    (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
711                    (_, _) => 0,
712                };
713                return true;
714            }
715        }
716    }
717
718    /// Acquires a lock on the filter, if necessary.
719    ///
720    /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
721    /// If `max_evictions` is set to 0, no lock is acquired.
722    ///
723    /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
724    pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
725        if self.max_evictions == 0 {
726            None
727        } else {
728            Some(Lock::new(&self.lock, kind))
729        }
730    }
731
732    /// Execute a read operation with optimistic concurrency control and automatic retry.
733    ///
734    /// This is the cornerstone of the lock-free design, implementing a sophisticated
735    /// optimistic concurrency protocol that allows reads to proceed concurrently with
736    /// most write operations.
737    ///
738    /// ## Optimistic Concurrency Protocol
739    ///
740    /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
741    /// 2. **Execute read**: Run the provided function without any blocking
742    /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
743    /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
744    ///
745    /// ## How It Works
746    ///
747    /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
748    ///   coordinate through atomic compare-exchange operations that are linearizable
749    /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
750    ///   perform complex multi-step updates that require consistency
751    /// - **Early return optimization**: For operations that can short-circuit (like
752    ///   `contains()` returning true), we skip version validation as an optimization
753    ///
754    /// This pattern is essential for achieving lock-free performance while maintaining
755    /// correctness in the presence of concurrent modifications.
756    fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
757    where
758        F: Fn() -> T,
759        T: PartialEq,
760    {
761        if self.max_evictions == 0 {
762            return fun();
763        }
764        loop {
765            let lock = Lock::new(&self.lock, LockKind::Optimistic);
766            let result = fun();
767            if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
768                return result;
769            }
770        }
771    }
772}
773
774impl CuckooFilter<DefaultHasher> {
775    /// Create a new CuckooFilterBuilder with default settings
776    pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
777        CuckooFilterBuilder::default()
778    }
779
780    /// Create a new CuckooFilter with default settings
781    pub fn new() -> CuckooFilter<DefaultHasher> {
782        Self::builder().build().unwrap()
783    }
784
785    /// Create a new CuckooFilter with the specified capacity
786    pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
787        Self::builder().capacity(capacity).build().unwrap()
788    }
789}
790
791impl Default for CuckooFilter<DefaultHasher> {
792    /// Create a new CuckooFilter with default settings
793    fn default() -> Self {
794        Self::new()
795    }
796}
797
798impl<H: Hasher + Default> CuckooFilterBuilder<H> {
799    /// Validate the builder configuration
800    fn validate(&self) -> Result<(), String> {
801        if let Some(fingerprint_size) = self.fingerprint_size
802            && ![4, 8, 16, 32].contains(&fingerprint_size)
803        {
804            return Err("Invalid fingerprint_size".into());
805        }
806        if self.bucket_size == Some(0) {
807            return Err("bucket_size must be greater than zero".into());
808        }
809        if self.capacity == Some(0) {
810            return Err("capacity must be greater than zero".into());
811        }
812        Ok(())
813    }
814
815    /// Build a CuckooFilter with the specified configuration
816    pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
817        let mut cuckoo_filter = self.base_build()?;
818        // Calculate the number of buckets (power of 2)
819        cuckoo_filter.num_buckets = cuckoo_filter
820            .capacity
821            .div_ceil(cuckoo_filter.bucket_size)
822            .next_power_of_two();
823        // Adjust the capacity to match the actual number of buckets
824        cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
825        // Calculate the fingerprint mask
826        cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
827        // Calculate the size of a bucket in bits
828        let bucket_bit_size = cuckoo_filter.bucket_size * cuckoo_filter.fingerprint_size;
829        // Calculate the number of atomic values per bucket
830        cuckoo_filter.atomics_per_bucket = bucket_bit_size.div_ceil(usize::BITS as usize);
831        // Calculate the number of fingerprints per atomic value
832        cuckoo_filter.fingerprints_per_atomic =
833            usize::BITS as usize / cuckoo_filter.fingerprint_size;
834        // Calculate the total number of atomic values needed
835        let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
836        let atomic_size = bit_size.div_ceil(usize::BITS as usize);
837        // Initialize the buckets
838        cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
839        Ok(cuckoo_filter)
840    }
841}