Skip to main content

expiring_atomic_filter/
lib.rs

1//! A transparent wrapper for [atomic-cuckoo-filter](https://crates.io/crates/atomic-cuckoo-filter)
2//! providing a time-based expiration mechanism using a circular buffer.
3
4use std::collections::hash_map::DefaultHasher;
5use std::hash::{Hash, Hasher};
6use std::time::SystemTime;
7
8use atomic_cuckoo_filter::{CuckooFilter, CuckooFilterBuilder, CuckooFilterBuilderError};
9use derive_builder::Builder;
10use serde::{Deserialize, Serialize};
11
12pub use atomic_cuckoo_filter::{Error, Lock, LockKind};
13
14/// A serializable approximate membership query filter supporting lock-free concurrency and time-based expiration.
15#[derive(Debug, Builder, Serialize, Deserialize)]
16#[builder(build_fn(private, name = "base_build", validate = "Self::validate"))]
17pub struct ExpiringAtomicFilter<H = DefaultHasher>
18where
19    H: Hasher + Default,
20{
21    /// Maximum number of elements the filter can store.
22    /// Divided evenly over each of the `expiration_period` intervals.
23    #[builder(default = "1048576")]
24    capacity: usize,
25
26    /// Size of fingerprints in bits (must be 4, 8, 16, or 32).
27    /// Default is 32 to minimize the effect of false positive `contains` checks
28    /// when removing items.
29    #[builder(default = "32")]
30    fingerprint_size: usize,
31
32    /// Number of fingerprints per bucket.
33    #[builder(default = "4")]
34    bucket_size: usize,
35
36    /// Maximum number of evictions to try before giving up.
37    #[builder(default = "500")]
38    max_evictions: usize,
39
40    /// Number of seconds each item is expected to remain in the filter. Items may remain up to
41    /// `ttl + expiration_period`, but no longer than that. Must be a multiple of `expiration_period`.
42    #[builder(default = "86400")]
43    pub ttl: u64,
44
45    /// Maximum number of seconds between expiration events.
46    #[builder(default = "3600")]
47    pub expiration_period: u64,
48
49    /// Unix timestamp of when the filter was created.
50    /// Needed to determine which slot in the buffer is selected for insert, lock, and expire operations.
51    #[builder(setter(skip))]
52    pub created: u64,
53
54    /// A circular buffer of filters. At any given moment, one slot is being written to
55    /// and one slot is waiting to be cleared by an externally-triggered expiration process.
56    #[builder(setter(skip))]
57    // otherwise serde_derive will add `H: Serialize` and `H: Deserialize`
58    #[serde(bound(serialize = "H: Hasher + Default", deserialize = "H: Hasher + Default"))]
59    slots: Vec<CuckooFilter<H>>,
60}
61
62impl<H> ExpiringAtomicFilter<H>
63where
64    H: Hasher + Default,
65{
66    /// Insert an item into the filter.
67    ///
68    /// Returns Ok(()) if the item was inserted, or Error::NotEnoughSpace if the filter is full.
69    ///
70    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.insert>
71    pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
72        let write_slot = self.write_slot(Self::now_timestamp());
73        self.slots[write_slot].insert(item)
74    }
75
76    /// Check if an item is in the filter and insert it if is not present (atomically).
77    ///
78    /// Returns Ok(true) if the item was inserted, Ok(false) if it was already present,
79    /// or Error::NotEnoughSpace if the filter is full.
80    ///
81    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.insert_unique>
82    pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
83        self.insert_unique_as_of(item, Self::now_timestamp())
84    }
85
86    #[inline(always)]
87    fn insert_unique_as_of<T: ?Sized + Hash>(&self, item: &T, now: u64) -> Result<bool, Error> {
88        let write_slot = self.write_slot(now);
89        self.slots[write_slot].insert_unique(item)
90    }
91
92    /// Counts the number of occurrences of an item in the filter.
93    ///
94    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.count>
95    pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
96        self.slots.iter().map(|f| f.count(item)).sum()
97    }
98
99    /// Attempts to remove an item from the filter.
100    ///
101    /// Returns true if the item was successfully removed, or false if it was not found.
102    ///
103    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.remove>
104    pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
105        for filter in &self.slots {
106            // Removing a non-existent item can corrupt the filter. Although `contains`
107            // can produce false positives, this risk mitigated by configuring
108            // the default fingerprint size as 32.
109            if filter.contains(item) && filter.remove(item) {
110                return true;
111            }
112        }
113        false
114    }
115
116    /// Check if an item is in the filter.
117    ///
118    /// Returns true if the item is possibly in the filter (may have false positives),
119    /// false if it is definitely not in the filter.
120    ///
121    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.contains>
122    pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
123        for filter in &self.slots {
124            if filter.contains(item) {
125                return true;
126            }
127        }
128        false
129    }
130
131    /// Get the number of elements in the filter.
132    pub fn len(&self) -> usize {
133        self.slots.iter().map(|f| f.len()).sum()
134    }
135
136    /// Check if the filter is empty.
137    pub fn is_empty(&self) -> bool {
138        self.len() == 0
139    }
140
141    /// Get the capacity of the filter.
142    pub fn capacity(&self) -> usize {
143        let slot_capacity = self.slots[0].capacity();
144        // count all slots, except the fully expired one
145        slot_capacity * (self.slots.len() - 1)
146    }
147
148    /// Clear the filter, removing all elements.
149    pub fn clear(&self) {
150        for filter in &self.slots {
151            filter.clear();
152        }
153    }
154
155    /// Acquires a lock on the filter, if necessary.
156    ///
157    /// Returns Some(Lock) if a lock is needed, or None if no locking is required.
158    ///
159    /// <https://docs.rs/atomic-cuckoo-filter/*/atomic_cuckoo_filter/struct.CuckooFilter.html#method.lock>
160    pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
161        let write_slot = self.write_slot(Self::now_timestamp());
162        self.slots[write_slot].lock(kind)
163    }
164
165    /// Returns the number of items that were removed.
166    pub fn expire(&self) -> usize {
167        self.expire_as_of(Self::now_timestamp())
168    }
169
170    /// Returns the number of items that were removed.
171    #[inline]
172    pub fn expire_as_of(&self, now: u64) -> usize {
173        let expire_slot = (1 + self.write_slot(now)) % self.slots.len();
174        let filter = &self.slots[expire_slot];
175        let item_count = filter.len();
176
177        if item_count > 0 {
178            filter.clear();
179        }
180
181        item_count
182    }
183
184    #[inline(always)]
185    fn write_slot(&self, now: u64) -> usize {
186        let slot_count = self.slots.len() as u64;
187        let ttl_segment_duration = self.ttl / (slot_count - 2);
188        let buffer_duration = ttl_segment_duration * slot_count;
189        let now_buffer_time = (now - self.created) % buffer_duration;
190
191        let mut write_slot_start = now_buffer_time.next_multiple_of(ttl_segment_duration);
192        if !now_buffer_time.is_multiple_of(ttl_segment_duration) {
193            write_slot_start -= ttl_segment_duration;
194        }
195
196        (write_slot_start / ttl_segment_duration) as usize
197    }
198
199    #[inline(always)]
200    fn now_timestamp() -> u64 {
201        SystemTime::now()
202            .duration_since(SystemTime::UNIX_EPOCH)
203            .expect("epoch should be earlier than now")
204            .as_secs()
205    }
206}
207
208impl ExpiringAtomicFilter<DefaultHasher> {
209    /// Create a new ExpiringAtomicFilterBuilder with default settings
210    pub fn builder() -> ExpiringAtomicFilterBuilder<DefaultHasher> {
211        ExpiringAtomicFilterBuilder::default()
212    }
213
214    /// Create a new ExpiringAtomicFilter with default settings
215    pub fn new() -> ExpiringAtomicFilter<DefaultHasher> {
216        Self::builder().build().unwrap()
217    }
218
219    /// Create a new ExpiringAtomicFilter with the specified capacity
220    pub fn with_capacity(capacity: usize) -> ExpiringAtomicFilter<DefaultHasher> {
221        Self::builder().capacity(capacity).build().unwrap()
222    }
223}
224
225impl Default for ExpiringAtomicFilter<DefaultHasher> {
226    /// Create a new CuckooFilter with default settings
227    fn default() -> Self {
228        Self::new()
229    }
230}
231
232impl<H> ExpiringAtomicFilterBuilder<H>
233where
234    H: Hasher + Default + Clone,
235{
236    fn validate(&self) -> Result<(), String> {
237        if let (Some(ttl), Some(expiration_period)) = (self.ttl, self.expiration_period)
238            && !ttl.is_multiple_of(expiration_period)
239        {
240            return Err("ttl must be a multiple of expiration_period".into());
241        }
242        Ok(())
243    }
244
245    /// Build an ExpiringAtomicFilter with the specified configuration.
246    pub fn build(&self) -> Result<ExpiringAtomicFilter<H>, ExpiringAtomicFilterBuilderError> {
247        let mut filter = self.base_build()?;
248
249        filter.created = ExpiringAtomicFilter::<H>::now_timestamp();
250
251        // Reserve two additional slots for partially expired and fully expired items.
252        let slot_count = 2 + (filter.ttl / filter.expiration_period) as usize;
253        let unexpired_slot_capacity = filter.capacity / (slot_count - 2);
254
255        let mut slots = Vec::with_capacity(slot_count);
256        for _ in 0..slot_count {
257            let filter = CuckooFilterBuilder::default()
258                .capacity(unexpired_slot_capacity)
259                .fingerprint_size(filter.fingerprint_size)
260                .bucket_size(filter.bucket_size)
261                .max_evictions(filter.max_evictions)
262                .build()
263                .map_err(ExpiringAtomicFilterBuilderError::from)?;
264            slots.push(filter);
265        }
266        filter.slots = slots;
267
268        Ok(filter)
269    }
270}
271
272impl From<CuckooFilterBuilderError> for ExpiringAtomicFilterBuilderError {
273    fn from(value: CuckooFilterBuilderError) -> Self {
274        match value {
275            CuckooFilterBuilderError::ValidationError(mut description) => {
276                if description == "capacity must be greater than zero" {
277                    description = "capacity must be at least ttl / expiration_period".into();
278                }
279                Self::ValidationError(description)
280            }
281            CuckooFilterBuilderError::UninitializedField(field_name) => {
282                Self::UninitializedField(field_name)
283            }
284            _ => todo!(),
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_write_slot() {
295        let now = ExpiringAtomicFilter::<DefaultHasher>::now_timestamp();
296
297        let filter = ExpiringAtomicFilter::builder()
298            .capacity(2)
299            .ttl(hs(25))
300            .expiration_period(hs(12) + ms(30))
301            .build()
302            .unwrap();
303
304        let cases = [
305            (now + hs(12) + ms(29) + 59, 0),
306            (now + hs(12) + ms(30), 1),
307            (now + hs(24) + ms(59) + 59, 1),
308            (now + hs(25), 2),
309            (now + hs(37) + ms(29) + 59, 2),
310            (now + hs(37) + ms(30), 3),
311            (now + hs(49) + ms(59) + 59, 3),
312            (now + hs(50), 0),
313        ];
314
315        for (now_input, slot_num) in cases {
316            assert_eq!(filter.write_slot(now_input), slot_num);
317        }
318    }
319
320    #[test]
321    fn test_expire_as_of() {
322        let now = ExpiringAtomicFilter::<DefaultHasher>::now_timestamp();
323
324        let filter = ExpiringAtomicFilterBuilder::<ahash::AHasher>::default()
325            .capacity(50)
326            .ttl(hs(25))
327            .expiration_period(ms(30))
328            .build()
329            .unwrap();
330
331        // do not expire an item until the TTL has elapsed
332        assert_eq!(
333            filter.insert_unique_as_of("item1", now + ms(29) + 59),
334            Ok(true)
335        );
336        assert_eq!(
337            filter.expire_as_of(now + hs(25) + ms(29) + 59),
338            0,
339            "item1 at max age"
340        );
341        assert!(filter.contains("item1"));
342        assert_eq!(
343            filter.expire_as_of(now + hs(25) + ms(30)),
344            1,
345            "item1 expires after TTL"
346        );
347        assert!(!filter.contains("item1"));
348
349        // do not expire an item older than TTL + expiration period
350        assert_eq!(
351            filter.insert_unique_as_of("item2", now + hs(24) + ms(59) + 59),
352            Ok(true)
353        );
354        assert_eq!(
355            filter.expire_as_of(now + hs(50) + ms(30)),
356            0,
357            "too late to expire item2"
358        );
359        assert!(filter.contains("item2"));
360        assert_eq!(
361            filter.expire_as_of(now + hs(50) + ms(29) + 59),
362            1,
363            "item2 expired at last possible time"
364        );
365        assert!(!filter.contains("item2"));
366    }
367
368    fn hs(i: u64) -> u64 {
369        i * 3600
370    }
371
372    fn ms(i: u64) -> u64 {
373        i * 60
374    }
375}