atomic_cuckoo_filter/lib.rs
1// Lock-Free Concurrent Cuckoo Filter Implementation
2// A high-performance probabilistic data structure for efficient set membership testing
3// with better space efficiency than Bloom filters, support for deletions, and
4// fully concurrent operations using atomic operations and lock-free algorithms.
5
6use derive_builder::Builder;
7use rand::Rng;
8use std::collections::HashMap;
9use std::collections::hash_map::DefaultHasher;
10use std::hash::{Hash, Hasher};
11use std::hint;
12use std::marker::PhantomData;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15/// Maximum number of spin-loop iterations before parking a thread.
16/// This balances CPU usage vs. latency - spinning avoids kernel calls for
17/// short waits, but we park threads to avoid wasting CPU on long waits.
18const MAX_SPIN: usize = 100;
19
20/// Error type for Cuckoo Filter insert operation
21#[derive(Debug, thiserror::Error, PartialEq)]
22pub enum Error {
23 /// Returned when the filter is full and cannot accommodate more elements
24 #[error("Not enough space to store this item.")]
25 NotEnoughSpace,
26}
27
28/// Types of locks that can be acquired on the filter
29#[derive(PartialEq)]
30pub enum LockKind {
31 /// Optimistic version tracking - does not block other operations but captures
32 /// a version number to detect if data changed during the operation
33 Optimistic,
34 /// Exclusive among writers only - prevents other writers but allows concurrent readers
35 WriterExclusive,
36 /// Fully exclusive access - blocks all other operations (used only during evictions)
37 FullyExclusive,
38}
39
40/// A sophisticated lock implementation designed for concurrent cuckoo filter operations.
41///
42/// This is NOT a traditional mutex but an atomic-based synchronization mechanism that
43/// enables three distinct concurrency modes:
44///
45/// 1. **Optimistic locks**: Allow maximum concurrency - multiple readers and writers
46/// can proceed simultaneously. Used for optimistic reads that detect data races.
47///
48/// 2. **WriterExclusive locks**: Mutual exclusion among writers only - prevents
49/// concurrent modifications but allows concurrent reads.
50///
51/// 3. **FullyExclusive locks**: Complete mutual exclusion - blocks all operations.
52/// Only used during complex eviction chains to ensure consistency.
53///
54/// ## Version Encoding Scheme
55/// The atomic usize encodes both lock state and version information:
56/// - **Bits 0-1**: Lock kind (0=Optimistic, 1=WriterExclusive, 2=FullyExclusive)
57/// - **Bits 2-63**: Version counter (incremented on FullyExclusive release)
58///
59/// This allows optimistic readers to detect when their read might be stale by
60/// comparing version numbers before and after the operation.
61pub struct Lock<'a> {
62 /// Reference to the shared atomic value encoding lock state and version
63 atomic: &'a AtomicUsize,
64 /// Snapshot of the atomic value when this lock was acquired.
65 /// Used for optimistic concurrency control and version tracking.
66 /// The lower 2 bits indicate lock type, upper bits track version changes.
67 version: usize,
68 /// The type of lock held by this instance
69 kind: LockKind,
70 /// Counter for spin attempts before transitioning to thread parking.
71 /// Implements adaptive spinning to balance latency vs CPU usage.
72 retry: usize,
73}
74
75impl<'a> Lock<'a> {
76 /// Create a new lock of the specified kind
77 /// Blocks until the lock can be acquired
78 fn new(atomic: &'a AtomicUsize, kind: LockKind) -> Self {
79 let mut lock = Self {
80 atomic,
81 version: 0,
82 kind,
83 retry: 0,
84 };
85 match lock.kind {
86 LockKind::Optimistic => loop {
87 // For optimistic locks, we can proceed as long as there's no FullyExclusive lock
88 lock.version = atomic.load(Ordering::Relaxed);
89 if Self::kind(lock.version) != LockKind::FullyExclusive {
90 return lock;
91 }
92 lock.spin_or_park()
93 },
94 _ => loop {
95 // For writer exclusive and fully exclusive locks, we need to ensure no exclusive lock is acquired
96 lock.version = atomic.load(Ordering::Relaxed);
97 if Self::kind(lock.version) != LockKind::Optimistic {
98 lock.spin_or_park();
99 continue;
100 }
101 // Update lower bits of the version: 1 for WriterExclusive, 2 for FullyExclusive
102 let new_version = if lock.kind == LockKind::WriterExclusive {
103 lock.version + 1
104 } else {
105 lock.version + 2
106 };
107 if atomic
108 .compare_exchange_weak(
109 lock.version,
110 new_version,
111 Ordering::Release,
112 Ordering::Relaxed,
113 )
114 .is_ok()
115 {
116 return lock;
117 }
118 },
119 }
120 }
121
122 /// Upgrade a WriterExclusive lock to a FullyExclusive lock
123 /// This assumes the current thread holds the writer exclusive lock.
124 fn upgrade(&mut self) {
125 self.atomic.store(self.version + 2, Ordering::Release);
126 self.kind = LockKind::FullyExclusive;
127 }
128
129 /// Check if the lock is outdated (version changed) or a FullyExclusive lock is acquired
130 /// Used for optimistic concurrency control
131 fn is_outdated(&self) -> bool {
132 let version = self.atomic.load(Ordering::Acquire);
133 Self::kind(version) == LockKind::FullyExclusive || version >> 2 != self.version >> 2
134 }
135
136 /// Get the key for parking a thread
137 /// Different keys are used for optimistic and exclusive locks
138 fn park_key(&self) -> usize {
139 let key = self.atomic.as_ptr() as usize;
140 match self.kind {
141 LockKind::Optimistic => key,
142 _ => key + 1,
143 }
144 }
145
146 /// Spin or park the thread when waiting for a lock
147 fn spin_or_park(&mut self) {
148 if self.retry > MAX_SPIN {
149 // After MAX_SPIN attempts, park the thread
150 self.retry = 0;
151 unsafe {
152 parking_lot_core::park(
153 self.park_key(),
154 || self.atomic.load(Ordering::Acquire) == self.version,
155 || (),
156 |_, _| (),
157 parking_lot_core::DEFAULT_PARK_TOKEN,
158 None,
159 );
160 }
161 } else {
162 // Otherwise, spin
163 self.retry += 1;
164 hint::spin_loop();
165 }
166 }
167
168 /// Extract the lock kind from the lower 2 bits of a version value
169 fn kind(version: usize) -> LockKind {
170 match version & 0b11 {
171 0 => LockKind::Optimistic,
172 1 => LockKind::WriterExclusive,
173 2 => LockKind::FullyExclusive,
174 _ => panic!("Invalid Lock"),
175 }
176 }
177}
178
179impl Drop for Lock<'_> {
180 /// Release the lock when it goes out of scope
181 fn drop(&mut self) {
182 match self.kind {
183 LockKind::Optimistic => return, // No need to do anything for Optimistic locks
184 LockKind::WriterExclusive => {
185 // For WriterExclusive locks, release the lock without incrementing the version
186 self.atomic.store(self.version, Ordering::Release);
187 }
188 LockKind::FullyExclusive => {
189 // For FullyExclusive locks, increment the version to invalidate Optimistic locks
190 self.atomic.store(self.version + 4, Ordering::Release);
191 }
192 }
193
194 // Unpark waiting threads
195 let optimistic_key = self.atomic.as_ptr() as usize;
196 let exclusive_key = optimistic_key + 1;
197 unsafe {
198 // Unpark all waiting optimistic locks
199 parking_lot_core::unpark_all(optimistic_key, parking_lot_core::DEFAULT_UNPARK_TOKEN);
200 // Unpark one waiting exclusive lock (either WriterExclusive or FullyExclusive)
201 parking_lot_core::unpark_one(exclusive_key, |_| parking_lot_core::DEFAULT_UNPARK_TOKEN);
202 }
203 }
204}
205
206/// A highly concurrent lock-free probabilistic data structure for set membership testing.
207///
208/// ## What Makes It "Cuckoo"
209///
210/// Named after the cuckoo bird's behavior of displacing other birds' eggs, this filter
211/// uses **cuckoo hashing** where each item can be stored in one of two possible locations.
212/// When both locations are full, existing items are "evicted" (like cuckoo eggs) and
213/// relocated to their alternate position, creating eviction chains.
214///
215/// ## Algorithm Overview
216///
217/// 1. **Fingerprints**: Items are reduced to small fingerprints (4-32 bits) instead of
218/// storing full keys, providing excellent space efficiency.
219///
220/// 2. **Dual Hashing**: Each item has two possible bucket locations computed from its hash.
221/// This provides better space efficiency and flexibility when inserting and removing items.
222///
223/// 3. **Eviction Chains**: When both buckets are full, a random item is evicted from one
224/// bucket and moved to its alternate location, potentially triggering a chain of evictions.
225///
226/// 4. **Lock-Free Concurrency**: All operations use atomic compare-exchange loops instead
227/// of traditional locks, with optimistic concurrency control for read operations.
228/// The only exception is when inserting with evictions, where a FullyExclusive lock is used
229/// to ensure consistency.
230///
231/// ## Key Advantages Over Bloom Filters
232///
233/// - **Deletions supported**: Items can be removed without false negatives
234/// - **Better space efficiency**: ~20-30% less memory for same false positive rate
235/// - **Bounded lookup time**: Always at most 2 bucket checks, never more
236/// - **High concurrency**: Lock-free design enables excellent parallel performance
237///
238/// ## Concurrency Model
239///
240/// - **Reads**: Optimistic, can proceed concurrently with most operations
241/// - **Simple writes**: Use atomic compare-exchange loops without blocking other operations
242/// - **WriterExclusive locks**: Used for removing items, and for unique insertions
243/// - **Complex evictions**: Use FullyExclusive locks to ensure consistency
244///
245/// ## Time Complexity
246///
247/// - **Lookup**: O(1)
248/// - **Deletion**: O(1)
249/// - **Insertion**: Amortized O(1) due to eviction chains, but the number of evictions is bounded
250#[derive(Debug, Builder)]
251#[builder(
252 pattern = "owned",
253 build_fn(private, name = "base_build", validate = "Self::validate")
254)]
255pub struct CuckooFilter<H = DefaultHasher>
256where
257 H: Hasher + Default,
258{
259 // Configuration parameters
260 /// Maximum number of elements the filter can store
261 #[builder(default = "1048576")]
262 capacity: usize,
263
264 /// Size of fingerprints in bits (must be 4, 8, 16, or 32)
265 #[builder(default = "16")]
266 fingerprint_size: usize,
267
268 /// Number of fingerprints per bucket
269 #[builder(default = "4")]
270 bucket_size: usize,
271
272 /// Maximum number of evictions to try before giving up
273 #[builder(default = "500")]
274 max_evictions: usize,
275
276 // Internal values - automatically derived from the configuration
277 /// Number of fingerprints that can be stored in a single atomic value
278 #[builder(setter(skip))]
279 fingerprints_per_atomic: usize,
280
281 /// Number of buckets in the filter (power of 2)
282 #[builder(setter(skip))]
283 num_buckets: usize,
284
285 /// Number of atomic values per bucket
286 #[builder(setter(skip))]
287 atomics_per_bucket: usize,
288
289 /// Bit mask for extracting fingerprints
290 #[builder(setter(skip))]
291 fingerprint_mask: usize,
292
293 /// Storage for buckets, implemented as a vector of atomic values
294 #[builder(setter(skip))]
295 buckets: Vec<AtomicUsize>,
296
297 /// Atomic value used for locking
298 #[builder(setter(skip))]
299 lock: AtomicUsize,
300
301 /// Counter for the number of elements in the filter
302 #[builder(setter(skip))]
303 counter: AtomicUsize,
304
305 /// Phantom data for the hasher type
306 #[builder(setter(skip))]
307 _hasher: PhantomData<H>,
308}
309
310impl<H: Hasher + Default> CuckooFilter<H> {
311 /// Insert an item into the filter
312 ///
313 /// This operation first attempts a direct insertion without acquiring a lock.
314 /// If that fails due to bucket collisions, it falls back to the eviction-based
315 /// insertion algorithm which may require a write lock.
316 ///
317 /// Concurrent operations are safely handled through atomic operations.
318 ///
319 /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full
320 pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
321 let (index, fingerprint) = self.index_and_fingerprint(item);
322 self.try_insert(index, fingerprint).or_else(|error| {
323 if let Some(lock) = self.lock(LockKind::WriterExclusive) {
324 self.insert_with_evictions(index, fingerprint, lock)
325 } else {
326 Err(error)
327 }
328 })
329 }
330
331 /// Check if an item is in the filter and insert it if is not present (atomically)
332 ///
333 /// This method combines lookup and insert into a single atomic operation,
334 /// ensuring thread safety and consistency even with concurrent operations.
335 ///
336 /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
337 /// or Error::NotEnoughSpace if the filter is full
338 pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
339 let (index, fingerprint) = self.index_and_fingerprint(item);
340 if self.lookup_fingerprint(index, fingerprint).is_some() {
341 return Ok(false);
342 }
343 let lock = Lock::new(&self.lock, LockKind::WriterExclusive);
344 if self.lookup_fingerprint(index, fingerprint).is_some() {
345 return Ok(false);
346 }
347 self.try_insert(index, fingerprint)
348 .or_else(|error| {
349 if self.max_evictions == 0 {
350 return Err(error);
351 }
352 self.insert_with_evictions(index, fingerprint, lock)
353 })
354 .map(|_| true)
355 }
356
357 /// Counts the number of occurrences of an item in the filter.
358 ///
359 /// # Notes
360 /// - This is not a counting filter; it simply counts matching fingerprints in both candidate buckets.
361 /// - Useful for detecting duplicates or hash collisions, not for precise multiset membership.
362 /// - The count is limited by the filter's structure: at most `bucket_size * 2` per item.
363 /// - This method may count false positives due to hash collisions.
364 pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
365 let (index, fingerprint) = self.index_and_fingerprint(item);
366 let alt_index = self.alt_index(index, fingerprint);
367 self.atomic_read(
368 || {
369 self.read_bucket(index, Ordering::Acquire)
370 .into_iter()
371 .filter(|&f| f == fingerprint)
372 .count()
373 + self
374 .read_bucket(alt_index, Ordering::Acquire)
375 .into_iter()
376 .filter(|&f| f == fingerprint)
377 .count()
378 },
379 None,
380 )
381 }
382
383 /// Attempts to remove an item from the filter.
384 ///
385 /// Returns `true` if the item was successfully removed, or `false` if it was not found.
386 ///
387 /// Note:
388 /// - An item should only be removed if it was previously added. Removing a non-existent
389 /// item may inadvertently remove a different item due to hash collisions inherent to
390 /// cuckoo filters.
391 pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
392 let (index, fingerprint) = self.index_and_fingerprint(item);
393 while let Some((index, sub_index)) = self.lookup_fingerprint(index, fingerprint) {
394 let _lock = self.lock(LockKind::WriterExclusive);
395 if self.update_bucket(index, sub_index, fingerprint, 0, Ordering::Release) {
396 return true;
397 }
398 }
399 false
400 }
401
402 /// Check if an item is in the filter
403 ///
404 /// Returns `true` if the item is possibly in the filter (may have false positives),
405 /// `false` if it is definitely not in the filter
406 pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
407 let (index, fingerprint) = self.index_and_fingerprint(item);
408 self.atomic_read(
409 || self.lookup_fingerprint(index, fingerprint).is_some(),
410 Some(true),
411 )
412 }
413
414 /// Get the number of elements in the filter
415 pub fn len(&self) -> usize {
416 self.counter.load(Ordering::Acquire)
417 }
418
419 /// Check if the filter is empty
420 pub fn is_empty(&self) -> bool {
421 self.len() == 0
422 }
423
424 /// Get the capacity of the filter
425 pub fn capacity(&self) -> usize {
426 self.capacity
427 }
428
429 /// Clear the filter, removing all elements
430 pub fn clear(&self) {
431 let _lock = self.lock(LockKind::WriterExclusive);
432 for atomic in &self.buckets {
433 let old_value = atomic.swap(0, Ordering::Release);
434 let removed = (0..self.fingerprints_per_atomic)
435 .filter(|i| (old_value >> (i * self.fingerprint_size)) & self.fingerprint_mask != 0)
436 .count();
437 if removed > 0 {
438 self.counter.fetch_sub(removed, Ordering::Release);
439 }
440 }
441 }
442
443 /// Compute the hash of an item
444 /// Uses the generic hasher H for flexibility and performance
445 fn hash<T: ?Sized + Hash>(&self, data: &T) -> u64 {
446 let mut hasher = <H as Default>::default();
447 data.hash(&mut hasher);
448 hasher.finish()
449 }
450
451 /// Compute the bucket index and fingerprint for an item.
452 ///
453 /// 1. **Hash the item**: Use the configured hasher to get a 64-bit hash
454 /// 2. **Extract fingerprint**: Use multiplication + shift for high-quality
455 /// distribution across the fingerprint space, then add 1 to avoid zero.
456 /// 3. **Extract index**: Use bitwise AND with (num_buckets-1) since num_buckets
457 /// is always a power of 2, providing perfect hash distribution.
458 ///
459 /// ## Why This Design
460 ///
461 /// - **Non-zero fingerprints**: Adding 1 ensures fingerprints are never 0,
462 /// so 0 can represent empty slots without ambiguity
463 /// - **Independent bits**: Index uses lower hash bits, fingerprint uses
464 /// different bits via multiplication, avoiding correlation
465 /// - **Uniform distribution**: Both index and fingerprint are uniformly
466 /// distributed across their respective ranges
467 ///
468 /// Returns (index, fingerprint) where:
469 /// - index is the primary bucket index (0 to num_buckets-1)
470 /// - fingerprint is a compact hash of the item (1 to fingerprint_mask)
471 fn index_and_fingerprint<T: ?Sized + Hash>(&self, item: &T) -> (usize, usize) {
472 let hash = self.hash(item);
473 // Compute fingerprint using multiplication and shift for better distribution
474 let fingerprint = ((hash as u128 * self.fingerprint_mask as u128) >> 64) + 1;
475 // Compute index using modulo num_buckets (optimized with bitwise AND since num_buckets is a power of 2)
476 let index = hash as usize & (self.num_buckets - 1);
477 (index, fingerprint as usize)
478 }
479
480 /// Computes the alternative bucket index for a given fingerprint using cuckoo hashing.
481 ///
482 /// In cuckoo hashing, each item can reside in one of two possible buckets. This function
483 /// deterministically computes the alternate bucket index from the current index and fingerprint.
484 ///
485 /// Properties:
486 /// 1. Symmetry: `alt_index(alt_index(i, f), f) == i` for any index `i` and fingerprint `f`.
487 /// 2. Distinctness: For any fingerprint, the two indices are always different.
488 /// 3. Uniformity: The mapping distributes fingerprints evenly across all buckets.
489 fn alt_index(&self, index: usize, fingerprint: usize) -> usize {
490 index ^ (self.hash(&fingerprint) as usize & (self.num_buckets - 1))
491 }
492
493 /// Look up a fingerprint at its primary or alternative index
494 /// Returns `Some((index, sub_index))` if found, None otherwise
495 fn lookup_fingerprint(&self, index: usize, fingerprint: usize) -> Option<(usize, usize)> {
496 // First check the primary bucket
497 self.read_bucket(index, Ordering::Acquire)
498 .iter()
499 .position(|fp| fp == &fingerprint)
500 .map(|sub_index| (index, sub_index))
501 .or_else(|| {
502 // Then check the alternative bucket
503 let alt_index = self.alt_index(index, fingerprint);
504 self.read_bucket(alt_index, Ordering::Acquire)
505 .iter()
506 .position(|fp| fp == &fingerprint)
507 .map(|sub_index| (alt_index, sub_index))
508 })
509 }
510
511 /// Try to insert a fingerprint at its primary or alternative index
512 /// Returns `Ok(())` if successful, `Error::NotEnoughSpace` if both buckets are full
513 fn try_insert(&self, index: usize, fingerprint: usize) -> Result<(), Error> {
514 self.insert_at_index(index, fingerprint).or_else(|_| {
515 let alt_index = self.alt_index(index, fingerprint);
516 self.insert_at_index(alt_index, fingerprint)
517 .map_err(|_| Error::NotEnoughSpace)
518 })
519 }
520
521 /// Try to insert a fingerprint at a specific index
522 /// Returns `Ok(())` if successful, `Err(bucket)` if the bucket is full
523 fn insert_at_index(&self, index: usize, fingerprint: usize) -> Result<(), Vec<usize>> {
524 loop {
525 let bucket = self.read_bucket(index, Ordering::Relaxed);
526 if let Some(sub_index) = bucket.iter().position(|&i| i == 0) {
527 if self.update_bucket(index, sub_index, 0, fingerprint, Ordering::Release) {
528 return Ok(());
529 } else {
530 continue;
531 }
532 } else {
533 return Err(bucket);
534 }
535 }
536 }
537
538 /// Insert a fingerprint using cuckoo eviction chains when both buckets are full.
539 ///
540 /// This method is invoked only as a fallback when direct insertion fails, preserving
541 /// the optimistic, lock-free fast path for the common case.
542 ///
543 /// # Cuckoo Eviction Algorithm
544 ///
545 /// When both possible locations for an item are full:
546 /// 1. **Randomly select** an existing item from one of the full buckets
547 /// 2. **Evict** that item and insert our new item in its place
548 /// 3. **Relocate** the evicted item to its alternate location
549 /// 4. **Repeat** if the alternate location is also full (eviction chain)
550 /// 5. **Succeed** when we find an empty slot, or **fail** after max_evictions
551 ///
552 /// # Implementation Details
553 ///
554 /// - **Eviction tracking**: Collects a sequence of planned evictions, which are
555 /// atomically applied only if the chain succeeds, ensuring atomicity and consistency.
556 /// - **Lock upgrading**: Starts with a `WriterExclusive` lock, upgrading to
557 /// `FullyExclusive` only when actually applying the eviction chain, maximizing
558 /// read concurrency during planning.
559 /// - **Loop prevention**: Uses a map to track which sub-indices have been tried
560 /// in each bucket, to ensure early detection of loops in eviction chains.
561 fn insert_with_evictions(
562 &self,
563 mut index: usize,
564 mut fingerprint: usize,
565 mut lock: Lock,
566 ) -> Result<(), Error> {
567 let mut rng = rand::rng();
568 let mut evictions = Vec::with_capacity(self.max_evictions.min(32));
569 let mut used_indices = HashMap::with_capacity(self.max_evictions.min(32));
570 while evictions.len() <= self.max_evictions {
571 if let Err(bucket) = self.insert_at_index(index, fingerprint) {
572 let sub_index = match used_indices.entry(index).or_insert(0usize) {
573 sub_indices if *sub_indices == 0 => {
574 // First time seeing this index, randomly choose a sub-index
575 let sub_index = rng.random_range(0..self.bucket_size);
576 *sub_indices = 1 << sub_index;
577 sub_index
578 }
579 sub_indices => {
580 // Find an unused sub-index
581 if let Some(sub_index) =
582 (0..self.bucket_size).find(|shift| (*sub_indices >> shift) & 1 == 0)
583 {
584 *sub_indices |= 1 << sub_index;
585 sub_index
586 } else {
587 return Err(Error::NotEnoughSpace);
588 }
589 }
590 };
591 // Evict the fingerprint at the chosen sub-index
592 let evicted = bucket[sub_index];
593 evictions.push((index, sub_index, fingerprint));
594 // Find the alternative index for the evicted fingerprint
595 index = self.alt_index(index, evicted);
596 fingerprint = evicted;
597 } else {
598 // Successfully inserted the fingerprint, now apply all evictions
599 lock.upgrade();
600 while let Some((index, sub_index, evicted)) = evictions.pop() {
601 self.update_bucket(index, sub_index, fingerprint, evicted, Ordering::Relaxed);
602 fingerprint = evicted;
603 }
604 return Ok(());
605 }
606 }
607 // Reached the maximum number of evictions, give up
608 Err(Error::NotEnoughSpace)
609 }
610
611 /// Atomically read all fingerprints from a bucket using lock-free bit manipulation.
612 ///
613 /// ## Memory Layout Complexity
614 ///
615 /// Fingerprints are tightly packed in memory across multiple atomic usize values:
616 /// - Each bucket contains `bucket_size` fingerprints
617 /// - Each fingerprint is `fingerprint_size` bits
618 /// - Multiple fingerprints are packed into each atomic usize
619 /// - Buckets may span across multiple atomic values
620 ///
621 /// ## Algorithm Steps
622 ///
623 /// 1. Calculate which atomic values contain this bucket's data
624 /// 2. Atomically load each relevant atomic value (using Acquire ordering)
625 /// 3. Extract fingerprints using bit manipulation and masking
626 /// 4. Handle boundary cases where buckets span multiple atomics
627 /// 5. Skip any padding bits and return exactly `bucket_size` fingerprints
628 ///
629 /// This is completely lock-free - multiple threads can read concurrently,
630 /// and reads can proceed even during writes (though they might see
631 /// intermediate states that get resolved by retry logic).
632 ///
633 /// Returns a vector containing all fingerprints in the bucket (0 = empty slot)
634 fn read_bucket(&self, index: usize, ordering: Ordering) -> Vec<usize> {
635 let fingerprint_index = index * self.bucket_size;
636 let bit_index = fingerprint_index * self.fingerprint_size;
637 let start_index = bit_index / usize::BITS as usize;
638 let skip_bits = bit_index % usize::BITS as usize;
639 let skip_fingerprints = skip_bits >> self.fingerprint_size.trailing_zeros();
640 let end_index = start_index + self.atomics_per_bucket;
641
642 self.buckets[start_index..end_index]
643 .iter()
644 .flat_map(|atomic| {
645 let atomic_value = atomic.load(ordering);
646 (0..self.fingerprints_per_atomic).map(move |i| {
647 (atomic_value
648 >> (self.fingerprint_size * (self.fingerprints_per_atomic - i - 1)))
649 & self.fingerprint_mask
650 })
651 })
652 .skip(skip_fingerprints)
653 .take(self.bucket_size)
654 .collect()
655 }
656
657 /// Atomically update a single fingerprint using lock-free compare-exchange.
658 ///
659 /// ## Lock-Free Update Algorithm
660 ///
661 /// 1. **Locate the target**: Calculate which atomic usize contains the fingerprint
662 /// and the exact bit position within that atomic value
663 /// 2. **Read current state**: Load the current atomic value
664 /// 3. **Verify expectation**: Check that the target position contains `old_value`
665 /// 4. **Atomic update**: Use compare_exchange_weak to atomically replace `old_value`
666 /// with `new_value`, but only if the atomic hasn't changed since step 2
667 /// 5. **Retry on conflict**: If another thread modified the atomic concurrently,
668 /// restart from step 2
669 ///
670 /// ## Concurrency Safety
671 ///
672 /// - Uses `compare_exchange_weak` which can fail spuriously on some architectures
673 /// but is more efficient than the strong version
674 /// - Employs Release ordering on success to ensure other threads see the change
675 /// - Updates the global counter atomically to maintain consistency
676 /// - Returns false if the expected `old_value` is no longer present (indicating
677 /// another thread already modified this slot)
678 ///
679 /// Returns `true` if update succeeded, `false` if the slot no longer contains
680 /// the expected `old_value` due to concurrent modification.
681 fn update_bucket(
682 &self,
683 index: usize,
684 sub_index: usize,
685 old_value: usize,
686 new_value: usize,
687 ordering: Ordering,
688 ) -> bool {
689 let bit_index = (index * self.bucket_size + sub_index) * self.fingerprint_size;
690 let atomic_index = bit_index / usize::BITS as usize;
691 let skip_bits = bit_index % usize::BITS as usize;
692 let shift = usize::BITS as usize - self.fingerprint_size - skip_bits;
693 let fingerprint_mask = self.fingerprint_mask << shift;
694 let atomic = &self.buckets[atomic_index];
695
696 loop {
697 let atomic_value = atomic.load(Ordering::Relaxed);
698 if (atomic_value & fingerprint_mask) >> shift != old_value {
699 // The expected fingerprint is not present in the atomic value
700 return false;
701 }
702 let new_atomic_value = (atomic_value & !fingerprint_mask) | (new_value << shift);
703 if atomic
704 .compare_exchange_weak(atomic_value, new_atomic_value, ordering, Ordering::Relaxed)
705 .is_ok()
706 {
707 // Update the counter based on the change
708 match (old_value, new_value) {
709 (0, _) => self.counter.fetch_add(1, Ordering::Release),
710 (_, 0) => self.counter.fetch_sub(1, Ordering::Release),
711 (_, _) => 0,
712 };
713 return true;
714 }
715 }
716 }
717
718 /// Acquires a lock on the filter, if necessary.
719 ///
720 /// A lock is only required when evictions are enabled (i.e., `max_evictions > 0`).
721 /// If `max_evictions` is set to 0, no lock is acquired.
722 ///
723 /// Returns `Some(Lock)` if a lock is needed, or `None` if no locking is required.
724 pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
725 if self.max_evictions == 0 {
726 None
727 } else {
728 Some(Lock::new(&self.lock, kind))
729 }
730 }
731
732 /// Execute a read operation with optimistic concurrency control and automatic retry.
733 ///
734 /// This is the cornerstone of the lock-free design, implementing a sophisticated
735 /// optimistic concurrency protocol that allows reads to proceed concurrently with
736 /// most write operations.
737 ///
738 /// ## Optimistic Concurrency Protocol
739 ///
740 /// 1. **Snapshot version**: Acquire an Optimistic lock (capturing version number)
741 /// 2. **Execute read**: Run the provided function without any blocking
742 /// 3. **Validate consistency**: Check if version changed or FullyExclusive lock acquired
743 /// 4. **Retry or return**: If data may be stale, retry; otherwise return result
744 ///
745 /// ## How It Works
746 ///
747 /// - **WriterExclusive operations**: Don't invalidate optimistic reads because they
748 /// coordinate through atomic compare-exchange operations that are linearizable
749 /// - **FullyExclusive operations**: Do invalidate optimistic reads because they
750 /// perform complex multi-step updates that require consistency
751 /// - **Early return optimization**: For operations that can short-circuit (like
752 /// `contains()` returning true), we skip version validation as an optimization
753 ///
754 /// This pattern is essential for achieving lock-free performance while maintaining
755 /// correctness in the presence of concurrent modifications.
756 fn atomic_read<T, F>(&self, fun: F, early_return: Option<T>) -> T
757 where
758 F: Fn() -> T,
759 T: PartialEq,
760 {
761 if self.max_evictions == 0 {
762 return fun();
763 }
764 loop {
765 let lock = Lock::new(&self.lock, LockKind::Optimistic);
766 let result = fun();
767 if Some(&result) == early_return.as_ref() || !lock.is_outdated() {
768 return result;
769 }
770 }
771 }
772}
773
774impl CuckooFilter<DefaultHasher> {
775 /// Create a new CuckooFilterBuilder with default settings
776 pub fn builder() -> CuckooFilterBuilder<DefaultHasher> {
777 CuckooFilterBuilder::default()
778 }
779
780 /// Create a new CuckooFilter with default settings
781 pub fn new() -> CuckooFilter<DefaultHasher> {
782 Self::builder().build().unwrap()
783 }
784
785 /// Create a new CuckooFilter with the specified capacity
786 pub fn with_capacity(capacity: usize) -> CuckooFilter<DefaultHasher> {
787 Self::builder().capacity(capacity).build().unwrap()
788 }
789}
790
791impl Default for CuckooFilter<DefaultHasher> {
792 /// Create a new CuckooFilter with default settings
793 fn default() -> Self {
794 Self::new()
795 }
796}
797
798impl<H: Hasher + Default> CuckooFilterBuilder<H> {
799 /// Validate the builder configuration
800 fn validate(&self) -> Result<(), String> {
801 if let Some(fingerprint_size) = self.fingerprint_size
802 && ![4, 8, 16, 32].contains(&fingerprint_size)
803 {
804 return Err("Invalid fingerprint_size".into());
805 }
806 if self.bucket_size == Some(0) {
807 return Err("bucket_size must be greater than zero".into());
808 }
809 if self.capacity == Some(0) {
810 return Err("capacity must be greater than zero".into());
811 }
812 Ok(())
813 }
814
815 /// Build a CuckooFilter with the specified configuration
816 pub fn build(self) -> Result<CuckooFilter<H>, CuckooFilterBuilderError> {
817 let mut cuckoo_filter = self.base_build()?;
818 // Calculate the number of buckets (power of 2)
819 cuckoo_filter.num_buckets = cuckoo_filter
820 .capacity
821 .div_ceil(cuckoo_filter.bucket_size)
822 .next_power_of_two();
823 // Adjust the capacity to match the actual number of buckets
824 cuckoo_filter.capacity = cuckoo_filter.num_buckets * cuckoo_filter.bucket_size;
825 // Calculate the fingerprint mask
826 cuckoo_filter.fingerprint_mask = ((1u64 << cuckoo_filter.fingerprint_size) - 1) as usize;
827 // Calculate the size of a bucket in bits
828 let bucket_bit_size = cuckoo_filter.bucket_size * cuckoo_filter.fingerprint_size;
829 // Calculate the number of atomic values per bucket
830 cuckoo_filter.atomics_per_bucket = bucket_bit_size.div_ceil(usize::BITS as usize);
831 // Calculate the number of fingerprints per atomic value
832 cuckoo_filter.fingerprints_per_atomic =
833 usize::BITS as usize / cuckoo_filter.fingerprint_size;
834 // Calculate the total number of atomic values needed
835 let bit_size = cuckoo_filter.capacity * cuckoo_filter.fingerprint_size;
836 let atomic_size = bit_size.div_ceil(usize::BITS as usize);
837 // Initialize the buckets
838 cuckoo_filter.buckets = (0..atomic_size).map(|_| AtomicUsize::new(0)).collect();
839 Ok(cuckoo_filter)
840 }
841}