atomic_cuckoo_filter/lib.rs
1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use derive_builder::Builder;
7use rand::Rng;
8use std::collections::HashSet;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11use std::hint;
12use std::marker::PhantomData;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Maximum number of spin-loop iterations before parking a thread.
16/// This balances CPU usage vs. latency - spinning avoids kernel calls for
17/// short waits, but we park threads to avoid wasting CPU on long waits.
18const MAX_SPIN: usize = 100;
19
20/// Error type for Cuckoo Filter insert operation
21#[derive(Debug, thiserror::Error, PartialEq)]
22pub enum Error {
23 /// Returned when the filter is full and cannot accommodate more elements
24 #[error("Not enough space to store this item.")]
25 NotEnoughSpace,
26}
27
28/// Types of locks that can be acquired on the filter
29#[derive(PartialEq)]
30pub enum LockKind {
31 /// Optimistic version tracking - does not block other operations but captures
32 /// a version number to detect if data changed during the operation
33 Optimistic,
34 /// Exclusive among writers only - prevents other writers but allows concurrent readers
35 WriterExclusive,
36 /// Fully exclusive access - blocks all other operations (used only during evictions)
37 FullyExclusive,
38}
39
40/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
41///
42/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
43/// enables three distinct concurrency modes:
44///
45/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
46/// can proceed simultaneously. Used for optimistic reads that detect data races.
47///
48/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
49/// concurrent modifications but allows concurrent reads.
50///
51/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
52/// Only used during complex eviction chains to ensure consistency.
53///
54/// ## Version Encoding Scheme
55/// The atomic usize encodes both lock state and version information:
56/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
57/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
58///
59/// This allows optimistic readers to detect when their read might be stale by
60/// comparing version numbers before and after the operation.
61pub struct Lock<'a> {
62 /// Reference to the shared atomic value encoding lock state and version
63 atomic: &'a AtomicUsize,
64 /// Snapshot of the atomic value when this lock was acquired.
65 /// Used for optimistic concurrency control and version tracking.
66 /// The lower 2 bits indicate lock type, upper bits track version changes.
67 version: usize,
68 /// The type of lock held by this instance
69 kind: LockKind,
70 /// Counter for spin attempts before transitioning to thread parking.
71 /// Implements adaptive spinning to balance latency vs CPU usage.
72 retry: usize,
73}
74
75impl<'a> Lock<'a> {
76 /// Create a new lock of the specified kind
77 /// Blocks until the lock can be acquired
78 fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
79 let mut lock = Self {
80 atomic,
81 version: 0,
82 kind,
83 retry: 0,
84 };
85 match lock.kind {
86 LockKind::Optimistic => loop {
87 // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
88 lock.version = atomic.load(Ordering::Relaxed);
89 if Self::kind(lock.version) != LockKind::FullyExclusive {
90 return lock;
91 }
92 lock.spin_or_park()
93 },
94 _ => loop {
95 // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
96 lock.version = atomic.load(Ordering::Relaxed);
97 if Self::kind(lock.version) != LockKind::Optimistic {
98 lock.spin_or_park();
99 continue;
100 }
101 // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
102 let new_version = if lock.kind == LockKind::WriterExclusive {
103 lock.version + 1
104 } else {
105 lock.version + 2
106 };
107 if atomic
108 .compare_exchange_weak(
109 lock.version,
110 new_version,
111 Ordering::Release,
112 Ordering::Relaxed,
113 )
114 .is_ok()
115 {
116 return lock;
117 }
118 },
119 }
120 }
121
122 /// Upgrade a WriterExclusive lock to a FullyExclusive lock
123 /// This assumes the current thread holds the writer exclusive lock.
124 fn upgrade(&mut self) {
125 self.atomic.store(self.version + 2, Ordering::Release);
126 self.kind = LockKind::FullyExclusive;
127 }
128
129 /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
130 /// Used for optimistic concurrency control
131 fn is_outdated(&self) -> bool {
132 let version = self.atomic.load(Ordering::Acquire);
133 Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
134 }
135
136 /// Get the key for parking a thread
137 /// Different keys are used for optimistic and exclusive locks
138 fn park_key(&self) -> usize {
139 let key = self.atomic.as_ptr() as usize;
140 match self.kind {
141 LockKind::Optimistic => key,
142 _ => key + 1,
143 }
144 }
145
146 /// Spin or park the thread when waiting for a lock
147 fn spin_or_park(&mut self) {
148 if self.retry > MAX_SPIN {
149 // After MAX_SPIN attempts, park the thread
150 self.retry = 0;
151 unsafe {
152 parking_lot_core::park(
153 self.park_key(),
154 || self.atomic.load(Ordering::Acquire) == self.version,
155 || (),
156 |_, _| (),
157 parking_lot_core::DEFAULT_PARK_TOKEN,
158 None,
159 );
160 }
161 } else {
162 // Otherwise, spin
163 self.retry += 1;
164 hint::spin_loop();
165 }
166 }
167
168 /// Extract the lock kind from the lower 2 bits of a version value
169 fn kind(version: usize) -> LockKind {
170 match version & 0b11 {
171 0 => LockKind::Optimistic,
172 1 => LockKind::WriterExclusive,
173 2 => LockKind::FullyExclusive,
174 _ => panic!("Invalid Lock"),
175 }
176 }
177}
178
179impl Drop for Lock<'_> {
180 /// Release the lock when it goes out of scope
181 fn drop(&mut self) {
182 match self.kind {
183 LockKind::Optimistic => return, // No need to do anything for Optimistic locks
184 LockKind::WriterExclusive => {
185 // For WriterExclusive locks, release the lock without incrementing the version
186 self.atomic.store(self.version, Ordering::Release);
187 }
188 LockKind::FullyExclusive => {
189 // For FullyExclusive locks, increment the version to invalidate Optimistic locks
190 self.atomic.store(self.version + 4, Ordering::Release);
191 }
192 }
193
194 // Unpark waiting threads
195 let optimistic_key = self.atomic.as_ptr() as usize;
196 let exclusive_key = optimistic_key + 1;
197 unsafe {
198 // Unpark all waiting optimistic locks
199 parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
200 // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
201 parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
202 }
203 }
204}
205
206/// A highly concurrent lock-free probabilistic data structure for set membership testing.
207///
208/// ## What Makes It "Cuckoo"
209///
210/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
211/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
212/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
213/// relocated to their alternate position, creating eviction chains.
214///
215/// ## Algorithm Overview
216///
217/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
218/// storing full keys, providing excellent space efficiency.
219///
220/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
221/// This provides better space efficiency and flexibility when inserting and removing items.
222///
223/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
224/// bucket and moved to its alternate location, potentially triggering a chain of evictions.
225///
226/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
227/// of traditional locks, with optimistic concurrency control for read operations.
228/// The only exception is when inserting with evictions, where a FullyExclusive lock is used
229/// to ensure consistency.
230///
231/// ## Key Advantages Over Bloom Filters
232///
233/// - **Deletions supported**: Items can be removed without false negatives
234/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
235/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
236/// - **High concurrency**: Lock-free design enables excellent parallel performance
237///
238/// ## Concurrency Model
239///
240/// - **Reads**: Optimistic, can proceed concurrently with most operations
241/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
242/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
243/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
244///
245/// ## Time Complexity
246///
247/// - **Lookup**: O(1)
248/// - **Deletion**: O(1)
249/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
250#[derive(Debug, Builder)]
251#[builder(
252 pattern = "owned",
253 build_fn(private, name = "base_build", validate = "Self::validate")
254)]
255pub struct CuckooFilter<H = DefaultHasher>
256where
257 H: Hasher + Default,
258{
259 // Configuration parameters
260 /// Maximum number of elements the filter can store
261 #[builder(default = "1048576")]
262 capacity: usize,
263
264 /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
265 #[builder(default = "16")]
266 fingerprint_size: usize,
267
268 /// Number of fingerprints per bucket
269 #[builder(default = "4")]
270 bucket_size: usize,
271
272 /// Maximum number of evictions to try before giving up
273 #[builder(default = "500")]
274 max_evictions: usize,
275
276 // Internal values - automatically derived from the configuration
277 /// Number of fingerprints that can be stored in a single atomic value
278 #[builder(setter(skip))]
279 fingerprints_per_atomic: usize,
280
281 /// Number of buckets in the filter (power of 2)
282 #[builder(setter(skip))]
283 num_buckets: usize,
284
285 /// Bit mask for extracting fingerprints
286 #[builder(setter(skip))]
287 fingerprint_mask: usize,
288
289 /// Storage for buckets, implemented as a vector of atomic values
290 #[builder(setter(skip))]
291 buckets: Vec<AtomicUsize>,
292
293 /// Atomic value used for locking
294 #[builder(setter(skip))]
295 lock: AtomicUsize,
296
297 /// Counter for the number of elements in the filter
298 #[builder(setter(skip))]
299 counter: AtomicUsize,
300
301 /// Phantom data for the hasher type
302 #[builder(setter(skip))]
303 _hasher: PhantomData<H>,
304}
305
306impl<H: Hasher + Default> CuckooFilter<H> {
307 /// Insert an item into the filter
308 ///
309 /// This operation first attempts a direct insertion without acquiring a lock.
310 /// If that fails due to bucket collisions, it falls back to the eviction-based
311 /// insertion algorithm which may require a write lock.
312 ///
313 /// Concurrent operations are safely handled through atomic operations.
314 ///
315 /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
316 pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
317 let (index, fingerprint) = self.index_and_fingerprint(item);
318 self.try_insert(index, fingerprint).or_else(|error| {
319 let lock = self.lock(LockKind::WriterExclusive).ok_or(error)?;
320 self.insert_with_evictions(index, fingerprint, lock)
321 })
322 }
323
324 /// Check if an item is in the filter and insert it if is not present (atomically)
325 ///
326 /// This method combines lookup and insert into a single atomic operation,
327 /// ensuring thread safety and consistency even with concurrent operations.
328 ///
329 /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
330 /// or Error::NotEnoughSpace if the filter is full
331 pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
332 let (index, fingerprint) = self.index_and_fingerprint(item);
333 if self.lookup_fingerprint(index, fingerprint).is_some() {
334 return Ok(false);
335 }
336 let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
337 if self.lookup_fingerprint(index, fingerprint).is_some() {
338 return Ok(false);
339 }
340 self.try_insert(index, fingerprint)
341 .or_else(|error| {
342 if self.max_evictions == 0 {
343 return Err(error);
344 }
345 self.insert_with_evictions(index, fingerprint, lock)
346 })
347 .map(|_| true)
348 }
349
350 /// Counts the number of occurrences of an item in the filter.
351 ///
352 /// # Notes
353 /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
354 /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
355 /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
356 /// - This method may count false positives due to hash collisions.
357 pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
358 let (index, fingerprint) = self.index_and_fingerprint(item);
359 let alt_index = self.alt_index(index, fingerprint);
360 self.atomic_read(
361 || {
362 self.read_bucket(index, Ordering::Acquire)
363 .filter(|&f| f == fingerprint)
364 .count()
365 + self
366 .read_bucket(alt_index, Ordering::Acquire)
367 .filter(|&f| f == fingerprint)
368 .count()
369 },
370 None,
371 )
372 }
373
374 /// Attempts to remove an item from the filter.
375 ///
376 /// Returns `true` if the item was successfully removed, or `false` if it was not found.
377 ///
378 /// Note:
379 /// - An item should only be removed if it was previously added. Removing a non-existent
380 /// item may inadvertently remove a different item due to hash collisions inherent to
381 /// cuckoo filters.
382 pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
383 let (index, fingerprint) = self.index_and_fingerprint(item);
384 while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
385 let _lock = self.lock(LockKind::WriterExclusive);
386 if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
387 return true;
388 }
389 }
390 false
391 }
392
393 /// Check if an item is in the filter
394 ///
395 /// Returns `true` if the item is possibly in the filter (may have false positives),
396 /// `false` if it is definitely not in the filter
397 pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
398 let (index, fingerprint) = self.index_and_fingerprint(item);
399 self.atomic_read(
400 || self.lookup_fingerprint(index, fingerprint).is_some(),
401 Some(true),
402 )
403 }
404
405 /// Get the number of elements in the filter
406 pub fn len(&self) -> usize {
407 self.counter.load(Ordering::Acquire)
408 }
409
410 /// Check if the filter is empty
411 pub fn is_empty(&self) -> bool {
412 self.len() == 0
413 }
414
415 /// Get the capacity of the filter
416 pub fn capacity(&self) -> usize {
417 self.capacity
418 }
419
420 /// Clear the filter, removing all elements
421 pub fn clear(&self) {
422 let _lock = self.lock(LockKind::WriterExclusive);
423 for atomic in &self.buckets {
424 let old_value = atomic.swap(0, Ordering::Release);
425 let removed = (0..self.fingerprints_per_atomic)
426 .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
427 .count();
428 if removed > 0 {
429 self.counter.fetch_sub(removed, Ordering::Release);
430 }
431 }
432 }
433
434 /// Compute the hash of an item
435 /// Uses the generic hasher H for flexibility and performance
436 fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
437 let mut hasher = <H as Default>::default();
438 data.hash(&mut hasher);
439 hasher.finish()
440 }
441
442 /// Compute the bucket index and fingerprint for an item.
443 ///
444 /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
445 /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
446 /// distribution across the fingerprint space, then add 1 to avoid zero.
447 /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
448 /// is always a power of 2, providing perfect hash distribution.
449 ///
450 /// ## Why This Design
451 ///
452 /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
453 /// so 0 can represent empty slots without ambiguity
454 /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
455 /// different bits via multiplication, avoiding correlation
456 /// - **Uniform distribution**: Both index and fingerprint are uniformly
457 /// distributed across their respective ranges
458 ///
459 /// Returns (index, fingerprint) where:
460 /// - index is the primary bucket index (0 to num_buckets-1)
461 /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
462 fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
463 let hash = self.hash(item);
464 // Compute fingerprint using multiplication and shift for better distribution
465 let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
466 // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
467 let index = hash as usize & (self.num_buckets - 1);
468 (index, fingerprint as usize)
469 }
470
471 /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
472 ///
473 /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
474 /// deterministically computes the alternate bucket index from the current index and fingerprint.
475 ///
476 /// Properties:
477 /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
478 /// 2. Distinctness: For any fingerprint, the two indices are always different.
479 /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
480 fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
481 index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
482 }
483
484 /// Look up a fingerprint at its primary or alternative index
485 /// Returns `Some((index, sub_index))` if found, None otherwise
486 fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
487 // First check the primary bucket
488 self.read_bucket(index, Ordering::Acquire)
489 .position(|fp| fp == fingerprint)
490 .map(|sub_index| (index, sub_index))
491 .or_else(|| {
492 // Then check the alternative bucket
493 let alt_index = self.alt_index(index, fingerprint);
494 self.read_bucket(alt_index, Ordering::Acquire)
495 .position(|fp| fp == fingerprint)
496 .map(|sub_index| (alt_index, sub_index))
497 })
498 }
499
500 /// Try to insert a fingerprint at its primary or alternative index
501 /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
502 fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
503 self.insert_at_index(index, fingerprint).or_else(|_| {
504 let alt_index = self.alt_index(index, fingerprint);
505 self.insert_at_index(alt_index, fingerprint)
506 })
507 }
508
509 /// Try to insert a fingerprint at a specific index
510 /// Returns Ok(()) if successful, Err(Error::NotEnoughSpace) if the bucket is full
511 fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
512 loop {
513 let sub_index = self
514 .read_bucket(index, Ordering::Relaxed)
515 .position(|i| i == 0)
516 .ok_or(Error::NotEnoughSpace)?;
517
518 if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
519 return Ok(());
520 }
521 }
522 }
523
524 /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
525 ///
526 /// This method is invoked only as a fallback when direct insertion fails, preserving
527 /// the optimistic, lock-free fast path for the common case.
528 ///
529 /// # Cuckoo Eviction Algorithm
530 ///
531 /// When both possible locations for an item are full:
532 /// 1. **Randomly select** an existing item from one of the full buckets
533 /// 2. **Evict** that item and insert our new item in its place
534 /// 3. **Relocate** the evicted item to its alternate location
535 /// 4. **Repeat** if the alternate location is also full (eviction chain)
536 /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
537 ///
538 /// # Implementation Details
539 ///
540 /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
541 /// atomically applied only if the chain succeeds, ensuring atomicity and consistency.
542 /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
543 /// `FullyExclusive` only when actually applying the eviction chain, maximizing
544 /// read concurrency during planning.
545 /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
546 /// in each bucket, to ensure early detection of loops in eviction chains.
547 fn insert_with_evictions(
548 &self,
549 mut index: usize,
550 mut fingerprint: usize,
551 mut lock: Lock,
552 ) -> Result<(), Error> {
553 let mut rng = rand::rng();
554 let mut insertions = Vec::with_capacity(self.max_evictions.min(32));
555 let mut used_slots = HashSet::with_capacity(self.max_evictions.min(32));
556 while insertions.len() <= self.max_evictions {
557 // Choose a sub-index in this bucket whose global slot has not been used yet in the plan
558 let base_slot = index * self.bucket_size;
559 let mut sub_index = rng.random_range(0..self.bucket_size);
560 if used_slots.contains(&(base_slot + sub_index)) {
561 sub_index = (0..self.bucket_size)
562 .find(|&i| !used_slots.contains(&(base_slot + i)))
563 .ok_or(Error::NotEnoughSpace)?;
564 }
565 used_slots.insert(base_slot + sub_index);
566 insertions.push((index, sub_index, fingerprint));
567
568 // Evict the fingerprint at the chosen sub-index
569 fingerprint = self
570 .read_bucket(index, Ordering::Relaxed)
571 .nth(sub_index)
572 .unwrap();
573 // Find the alternative index for the evicted fingerprint
574 index = self.alt_index(index, fingerprint);
575
576 if self.insert_at_index(index, fingerprint).is_ok() {
577 // Successfully inserted the fingerprint, now apply all evictions
578 lock.upgrade();
579 let mut evicted = fingerprint;
580 while let Some((index, sub_index, fingerprint)) = insertions.pop() {
581 self.update_bucket(index, sub_index, evicted, fingerprint, Ordering::Relaxed);
582 evicted = fingerprint;
583 }
584 return Ok(());
585 }
586 }
587 // Reached the maximum number of evictions, give up
588 Err(Error::NotEnoughSpace)
589 }
590
591 /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
592 ///
593 /// ## Memory Layout Complexity
594 ///
595 /// Fingerprints are tightly packed in memory across multiple atomic usize values:
596 /// - Each bucket contains `bucket_size` fingerprints
597 /// - Each fingerprint is `fingerprint_size` bits
598 /// - Multiple fingerprints are packed into each atomic usize
599 /// - Buckets may span across multiple atomic values
600 ///
601 /// ## Algorithm Steps
602 ///
603 /// 1. Calculate which atomic values contain this bucket's data
604 /// 2. Atomically load each relevant atomic value (using Acquire ordering)
605 /// 3. Extract fingerprints using bit manipulation and masking
606 /// 4. Handle boundary cases where buckets span multiple atomics
607 /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
608 ///
609 /// This is completely lock-free - multiple threads can read concurrently,
610 /// and reads can proceed even during writes (though they might see
611 /// intermediate states that get resolved by retry logic).
612 ///
613 /// Returns an Iterator over the fingerprints in the bucket, (0 = empty slot).
614 fn read_bucket(&self, index: usize, ordering: Ordering) -> impl Iterator<Item = usize> {
615 let fingerprint_index = index * self.bucket_size;
616 let bit_index = fingerprint_index * self.fingerprint_size;
617 let start_index = bit_index / usize::BITS as usize;
618 let skip_bits = bit_index % usize::BITS as usize;
619 let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
620 // No need to calculate end_index; just iterate from start_index to the end of the bucket
621 self.buckets[start_index..]
622 .iter()
623 .flat_map(move |atomic| {
624 let atomic_value = atomic.load(ordering);
625 (0..self.fingerprints_per_atomic).map(move |i| {
626 (atomic_value
627 >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
628 & self.fingerprint_mask
629 })
630 })
631 .skip(skip_fingerprints)
632 .take(self.bucket_size)
633 }
634
635 /// Atomically update a single fingerprint using lock-free compare-exchange.
636 ///
637 /// ## Lock-Free Update Algorithm
638 ///
639 /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
640 /// and the exact bit position within that atomic value
641 /// 2. **Read current state**: Load the current atomic value
642 /// 3. **Verify expectation**: Check that the target position contains `old_value`
643 /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
644 /// with `new_value`, but only if the atomic hasn't changed since step 2
645 /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
646 /// restart from step 2
647 ///
648 /// ## Concurrency Safety
649 ///
650 /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
651 /// but is more efficient than the strong version
652 /// - Employs Release ordering on success to ensure other threads see the change
653 /// - Updates the global counter atomically to maintain consistency
654 /// - Returns false if the expected `old_value` is no longer present (indicating
655 /// another thread already modified this slot)
656 ///
657 /// Returns `true` if update succeeded, `false` if the slot no longer contains
658 /// the expected `old_value` due to concurrent modification.
659 fn update_bucket(
660 &self,
661 index: usize,
662 sub_index: usize,
663 old_value: usize,
664 new_value: usize,
665 ordering: Ordering,
666 ) -> bool {
667 let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
668 let atomic_index = bit_index / usize::BITS as usize;
669 let skip_bits = bit_index % usize::BITS as usize;
670 let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
671 let fingerprint_mask = self.fingerprint_mask << shift;
672 let atomic = &self.buckets[atomic_index];
673
674 loop {
675 let atomic_value = atomic.load(Ordering::Relaxed);
676 if (atomic_value & fingerprint_mask) >> shift != old_value {
677 // The expected fingerprint is not present in the atomic value
678 return false;
679 }
680 let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
681 if atomic
682 .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
683 .is_ok()
684 {
685 // Update the counter based on the change
686 match (old_value, new_value) {
687 (0, _) => self.counter.fetch_add(1, Ordering::Release),
688 (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
689 (_, _) => 0,
690 };
691 return true;
692 }
693 }
694 }
695
696 /// Acquires a lock on the filter, if necessary.
697 ///
698 /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
699 /// If `max_evictions` is set to 0, no lock is acquired.
700 ///
701 /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
702 pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
703 if self.max_evictions == 0 {
704 None
705 } else {
706 Some(Lock::new(&self.lock, kind))
707 }
708 }
709
710 /// Execute a read operation with optimistic concurrency control and automatic retry.
711 ///
712 /// This is the cornerstone of the lock-free design, implementing a sophisticated
713 /// optimistic concurrency protocol that allows reads to proceed concurrently with
714 /// most write operations.
715 ///
716 /// ## Optimistic Concurrency Protocol
717 ///
718 /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
719 /// 2. **Execute read**: Run the provided function without any blocking
720 /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
721 /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
722 ///
723 /// ## How It Works
724 ///
725 /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
726 /// coordinate through atomic compare-exchange operations that are linearizable
727 /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
728 /// perform complex multi-step updates that require consistency
729 /// - **Early return optimization**: For operations that can short-circuit (like
730 /// `contains()` returning true), we skip version validation as an optimization
731 ///
732 /// This pattern is essential for achieving lock-free performance while maintaining
733 /// correctness in the presence of concurrent modifications.
734 fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
735 where
736 F: Fn() -> T,
737 T: PartialEq,
738 {
739 if self.max_evictions == 0 {
740 return fun();
741 }
742 loop {
743 let lock = Lock::new(&self.lock, LockKind::Optimistic);
744 let result = fun();
745 if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
746 return result;
747 }
748 }
749 }
750}
751
752impl CuckooFilter<DefaultHasher> {
753 /// Create a new CuckooFilterBuilder with default settings
754 pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
755 CuckooFilterBuilder::default()
756 }
757
758 /// Create a new CuckooFilter with default settings
759 pub fn new() -> CuckooFilter<DefaultHasher> {
760 Self::builder().build().unwrap()
761 }
762
763 /// Create a new CuckooFilter with the specified capacity
764 pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
765 Self::builder().capacity(capacity).build().unwrap()
766 }
767}
768
769impl Default for CuckooFilter<DefaultHasher> {
770 /// Create a new CuckooFilter with default settings
771 fn default() -> Self {
772 Self::new()
773 }
774}
775
776impl<H: Hasher + Default> CuckooFilterBuilder<H> {
777 /// Validate the builder configuration
778 fn validate(&self) -> Result<(), String> {
779 if let Some(fingerprint_size) = self.fingerprint_size
780 && ![4, 8, 16, 32].contains(&fingerprint_size)
781 {
782 return Err("Invalid fingerprint_size".into());
783 }
784 if self.bucket_size == Some(0) {
785 return Err("bucket_size must be greater than zero".into());
786 }
787 if self.capacity == Some(0) {
788 return Err("capacity must be greater than zero".into());
789 }
790 Ok(())
791 }
792
793 /// Build a CuckooFilter with the specified configuration
794 pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
795 let mut cuckoo_filter = self.base_build()?;
796 // Calculate the number of buckets (power of 2)
797 cuckoo_filter.num_buckets = cuckoo_filter
798 .capacity
799 .div_ceil(cuckoo_filter.bucket_size)
800 .next_power_of_two();
801 // Adjust the capacity to match the actual number of buckets
802 cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
803 // Calculate the fingerprint mask
804 cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
805 // Calculate the number of fingerprints per atomic value
806 cuckoo_filter.fingerprints_per_atomic =
807 usize::BITS as usize / cuckoo_filter.fingerprint_size;
808 // Calculate the total number of atomic values needed
809 let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
810 let atomic_size = bit_size.div_ceil(usize::BITS as usize);
811 // Initialize the buckets
812 cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
813 Ok(cuckoo_filter)
814 }
815}