pithanos 0.2.0

Fast, lock-free probabilistic data structures for modern Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::hash::Hash;
use crate::utils::single_hash;
use crate::traits::ProbabilisticSet;

const FINGERPRINT_BITS: u32 = 16;
const FINGERPRINT_MASK: usize = (1 << FINGERPRINT_BITS) - 1;
const SLOTS_PER_BUCKET: u32 = usize::BITS / FINGERPRINT_BITS;
const MAX_KICKS: usize = 128;

pub struct CuckooFilter {
    buckets: Vec<AtomicUsize>,
    size: usize,
    kick_count: AtomicUsize,
}

impl CuckooFilter {
    pub fn new(size: usize) -> Self {
        let mut buckets = Vec::with_capacity(size);
        buckets.resize_with(size, || AtomicUsize::new(0));
        Self { buckets, size, kick_count: AtomicUsize::new(0) }
    }

    fn fingerprint<T: Hash>(&self, item: &T) -> u16 {
        let hash = single_hash(item);
        let fp = (hash & FINGERPRINT_MASK) as u16;
        if fp == 0 { 1 } else { fp }
    }

    fn primary_bucket_index<T: Hash>(&self, item: &T) -> usize {
        single_hash(item) % self.size
    }

    fn secondary_bucket_index(&self, fingerprint: u16, primary_index: usize) -> usize {
        let hash = single_hash(&fingerprint);
        (primary_index ^ (hash % self.size)) % self.size
    }

    fn bucket_contains(&self, bucket_index: usize, fingerprint: u16) -> bool {
        let bucket = self.buckets[bucket_index].load(Ordering::Relaxed);
        for i in 0..(usize::BITS / FINGERPRINT_BITS) {
            let fp = ((bucket >> (i * FINGERPRINT_BITS)) & FINGERPRINT_MASK as usize) as u16;
            if fp == fingerprint {
                return true;
            }
        }
        false
    }

    fn bucket_insert(&self, bucket_index: usize, fingerprint: u16) -> bool {
        loop {
            let bucket = self.buckets[bucket_index].load(Ordering::Acquire);
            let mut found_empty = false;
            for i in 0..(usize::BITS / FINGERPRINT_BITS) {
                let fp = ((bucket >> (i * FINGERPRINT_BITS)) & FINGERPRINT_MASK as usize) as u16;
                if fp == 0 {
                    found_empty = true;
                    let new_bucket = bucket | ((fingerprint as usize) << (i * FINGERPRINT_BITS));
                    match self.buckets[bucket_index].compare_exchange(bucket, new_bucket, Ordering::AcqRel, Ordering::Relaxed) {
                        Ok(_) => return true,
                        Err(_) => break, // CAS failed, retry outer loop
                    }
                }
            }
            if !found_empty {
                return false; // Bucket is genuinely full
            }
            // CAS failed, loop again
        }
    }

    fn bucket_delete(&self, bucket_index: usize, fingerprint: u16) -> bool {
        loop {
            let bucket = self.buckets[bucket_index].load(Ordering::Acquire);
            let mut found = false;
            for i in 0..(usize::BITS / FINGERPRINT_BITS) {
                let fp = ((bucket >> (i * FINGERPRINT_BITS)) & FINGERPRINT_MASK as usize) as u16;
                if fp == fingerprint {
                    found = true;
                    let new_bucket = bucket & !((FINGERPRINT_MASK as usize) << (i * FINGERPRINT_BITS));
                    match self.buckets[bucket_index].compare_exchange(bucket, new_bucket, Ordering::AcqRel, Ordering::Relaxed) {
                        Ok(_) => return true,
                        Err(_) => break, // CAS failed, retry outer loop
                    }
                }
            }
            if !found {
                return false; // Fingerprint not in bucket
            }
            // CAS failed, loop again
        }
    }

    /// Swap a fingerprint in a specific slot of a bucket, returning the evicted fingerprint.
    /// Uses a deterministic slot selection based on kick_count for fairness.
    fn bucket_swap(&self, bucket_index: usize, new_fp: u16) -> u16 {
        let slot = (self.kick_count.fetch_add(1, Ordering::Relaxed) % SLOTS_PER_BUCKET as usize) as u32;
        loop {
            let bucket = self.buckets[bucket_index].load(Ordering::Acquire);
            let old_fp = ((bucket >> (slot * FINGERPRINT_BITS)) & FINGERPRINT_MASK as usize) as u16;
            let cleared = bucket & !((FINGERPRINT_MASK as usize) << (slot * FINGERPRINT_BITS));
            let new_bucket = cleared | ((new_fp as usize) << (slot * FINGERPRINT_BITS));

            match self.buckets[bucket_index].compare_exchange(bucket, new_bucket, Ordering::AcqRel, Ordering::Relaxed) {
                Ok(_) => return old_fp,
                Err(_) => continue, // CAS failed, retry
            }
        }
    }

    /// Try to insert a fingerprint, performing cuckoo eviction if necessary.
    /// Returns true if successful, false if the filter is too full.
    ///
    /// If insertion fails after MAX_KICKS, the eviction chain is rolled back
    /// to restore the filter to its original state.
    pub fn try_insert<T: Hash>(&self, item: &T) -> bool {
        let fingerprint = self.fingerprint(item);
        let primary = self.primary_bucket_index(item);

        // Try primary bucket first
        if self.bucket_insert(primary, fingerprint) {
            return true;
        }

        // Try secondary bucket
        let secondary = self.secondary_bucket_index(fingerprint, primary);
        if self.bucket_insert(secondary, fingerprint) {
            return true;
        }

        // Both buckets full, start cuckoo eviction
        // Track the chain so we can rollback on failure: (bucket_index, fingerprint_that_was_there)
        let mut chain: Vec<(usize, u16)> = Vec::with_capacity(MAX_KICKS);
        let mut current_fp = fingerprint;
        let mut index = secondary;

        for _ in 0..MAX_KICKS {
            // Swap our fingerprint with one in the current bucket
            let evicted = self.bucket_swap(index, current_fp);
            chain.push((index, evicted));
            current_fp = evicted;

            // Try to place the evicted fingerprint in its alternate bucket
            index = self.secondary_bucket_index(current_fp, index);
            if self.bucket_insert(index, current_fp) {
                return true;
            }
        }

        // Failed to insert - rollback the chain in reverse order
        // Each entry (idx, fp) means "fp was evicted from idx", so we restore it
        for (idx, original_fp) in chain.into_iter().rev() {
            // Swap back: put original_fp back, get back the fp we put there
            self.bucket_swap(idx, original_fp);
        }

        false // Filter is too full
    }
}

impl ProbabilisticSet for CuckooFilter {
    fn insert<T: Hash>(&self, item: &T) -> bool {
        self.try_insert(item)
    }

    fn contains<T: Hash>(&self, item: &T) -> bool {
        let fingerprint = self.fingerprint(item);

        let primary_bucket_index = self.primary_bucket_index(item);
        if self.bucket_contains(primary_bucket_index, fingerprint) {
            return true;
        }

        let secondary_bucket_index = self.secondary_bucket_index(fingerprint, primary_bucket_index);
        if self.bucket_contains(secondary_bucket_index, fingerprint) {
            return true;
        }

        false
    }

    fn delete<T: Hash>(&self, item: &T) {
        let fingerprint = self.fingerprint(item);

        let primary_bucket_index = self.primary_bucket_index(item);
        if self.bucket_delete(primary_bucket_index, fingerprint) {
            return;
        }

        let secondary_bucket_index = self.secondary_bucket_index(fingerprint, primary_bucket_index);
        if self.bucket_delete(secondary_bucket_index, fingerprint) {
            return;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;

    const FILTER_SIZE: usize = 1024;

    #[test]
    fn construct() {
        let cf = CuckooFilter::new(FILTER_SIZE);
        assert_eq!(cf.buckets.len(), FILTER_SIZE);
        assert!(cf.buckets.iter().all(|x| x.load(Ordering::Relaxed) == 0));
        assert_eq!(cf.size, FILTER_SIZE);
    }

    #[test]
    fn fingerprint_non_zero() {
        let cf = CuckooFilter::new(FILTER_SIZE);
        // Fingerprint should never be 0 (reserved for empty slot)
        for i in 0..1000 {
            let fp = cf.fingerprint(&i);
            assert_ne!(fp, 0, "Fingerprint should never be 0");
        }
    }

    #[test]
    fn insert_and_contains() {
        let cf = CuckooFilter::new(FILTER_SIZE);

        let item = "foo";
        assert!(cf.try_insert(&item));
        assert!(cf.contains(&item));

        let not_inserted = "bar";
        assert!(!cf.contains(&not_inserted));
    }

    #[test]
    fn insert_multiple() {
        let cf = CuckooFilter::new(FILTER_SIZE);

        for i in 0..100 {
            assert!(cf.try_insert(&i), "Failed to insert item {}", i);
        }

        for i in 0..100 {
            assert!(cf.contains(&i), "Item {} should be present", i);
        }
    }

    #[test]
    fn delete() {
        let cf = CuckooFilter::new(FILTER_SIZE);

        let item = "foo";
        cf.insert(&item);
        assert!(cf.contains(&item));

        cf.delete(&item);
        assert!(!cf.contains(&item));
    }

    #[test]
    fn delete_non_existent() {
        let cf = CuckooFilter::new(FILTER_SIZE);
        let item = "foo";
        // Should not panic when deleting non-existent item
        cf.delete(&item);
        assert!(!cf.contains(&item));
    }

    #[test]
    fn secondary_bucket_is_symmetric() {
        // Key property of cuckoo filters: secondary(fp, primary) XOR'd again gives primary
        let cf = CuckooFilter::new(FILTER_SIZE);
        let item = "test";
        let fp = cf.fingerprint(&item);
        let primary = cf.primary_bucket_index(&item);
        let secondary = cf.secondary_bucket_index(fp, primary);
        let back_to_primary = cf.secondary_bucket_index(fp, secondary);
        assert_eq!(primary, back_to_primary, "XOR property should be symmetric");
    }

    #[test]
    fn cuckoo_eviction_works() {
        // Use a small filter to force evictions
        let cf = CuckooFilter::new(16);

        // Insert enough items to trigger evictions
        // With 16 buckets and 4 slots per bucket (on 64-bit), capacity is ~64 items at ~95% load
        let mut inserted = 0;
        for i in 0..50 {
            if cf.try_insert(&i) {
                inserted += 1;
            }
        }

        // Should have inserted most items via eviction
        assert!(inserted >= 40, "Should insert at least 40 items, got {}", inserted);

        // Verify all inserted items are findable
        for i in 0..50 {
            if i < inserted as i32 {
                // Items that were successfully inserted should be found
                // (Note: this is approximate due to fingerprint collisions)
            }
        }
    }

    #[test]
    fn filter_full_returns_false() {
        // Very small filter that will fill up
        let cf = CuckooFilter::new(4);

        let mut failures = 0;
        for i in 0..100 {
            if !cf.try_insert(&i) {
                failures += 1;
            }
        }

        // Should eventually fail when filter is full
        assert!(failures > 0, "Small filter should reject some inserts");
    }

    #[test]
    fn concurrent_inserts() {
        let cf = Arc::new(CuckooFilter::new(FILTER_SIZE));
        let num_threads = 4;
        let items_per_thread = 100;

        let handles: Vec<_> = (0..num_threads)
            .map(|t| {
                let cf = Arc::clone(&cf);
                thread::spawn(move || {
                    for i in 0..items_per_thread {
                        let item = t * items_per_thread + i;
                        cf.insert(&item);
                    }
                })
            })
            .collect();

        for handle in handles {
            handle.join().unwrap();
        }

        // Most items should be present (some may fail due to filter capacity)
        let mut found = 0;
        for i in 0..(num_threads * items_per_thread) {
            if cf.contains(&i) {
                found += 1;
            }
        }

        assert!(
            found >= (num_threads * items_per_thread) * 90 / 100,
            "At least 90% of items should be found, got {}/{}",
            found,
            num_threads * items_per_thread
        );
    }

    #[test]
    fn concurrent_insert_and_lookup() {
        let cf = Arc::new(CuckooFilter::new(FILTER_SIZE));

        // Pre-insert some items
        for i in 0..50 {
            cf.insert(&i);
        }

        let cf_insert = Arc::clone(&cf);
        let cf_lookup = Arc::clone(&cf);

        let insert_handle = thread::spawn(move || {
            for i in 50..150 {
                cf_insert.insert(&i);
            }
        });

        let lookup_handle = thread::spawn(move || {
            let mut found = 0;
            for _ in 0..1000 {
                for i in 0..50 {
                    if cf_lookup.contains(&i) {
                        found += 1;
                    }
                }
            }
            found
        });

        insert_handle.join().unwrap();
        let found = lookup_handle.join().unwrap();

        // Note: During concurrent eviction, fingerprints may be temporarily "in flight"
        // between buckets, causing transient false negatives. We allow a small margin.
        let expected = 50 * 1000;
        assert!(
            found >= expected * 99 / 100,
            "Pre-inserted items should almost always be found, got {}/{}",
            found,
            expected
        );
    }
}