expiring_atomic_filter/
lib.rs1use std::collections::hash_map::DefaultHasher;
5use std::hash::{Hash, Hasher};
6use std::time::SystemTime;
7
8use atomic_cuckoo_filter::{CuckooFilter, CuckooFilterBuilder, CuckooFilterBuilderError};
9use derive_builder::Builder;
10use serde::{Deserialize, Serialize};
11
12pub use atomic_cuckoo_filter::{Error, Lock, LockKind};
13
14#[derive(Debug, Builder, Serialize, Deserialize)]
16#[builder(build_fn(private, name = "base_build", validate = "Self::validate"))]
17pub struct ExpiringAtomicFilter<H = DefaultHasher>
18where
19 H: Hasher + Default,
20{
21 #[builder(default = "1048576")]
24 capacity: usize,
25
26 #[builder(default = "32")]
30 fingerprint_size: usize,
31
32 #[builder(default = "4")]
34 bucket_size: usize,
35
36 #[builder(default = "500")]
38 max_evictions: usize,
39
40 #[builder(default = "86400")]
43 pub ttl: u64,
44
45 #[builder(default = "3600")]
47 pub expiration_period: u64,
48
49 #[builder(setter(skip))]
52 pub created: u64,
53
54 #[builder(setter(skip))]
57 #[serde(bound(serialize = "H: Hasher + Default", deserialize = "H: Hasher + Default"))]
59 slots: Vec<CuckooFilter<H>>,
60}
61
62impl<H> ExpiringAtomicFilter<H>
63where
64 H: Hasher + Default,
65{
66 pub fn insert<T: ?Sized + Hash>(&self, item: &T) -> Result<(), Error> {
72 let write_slot = self.write_slot(Self::now_timestamp());
73 self.slots[write_slot].insert(item)
74 }
75
76 pub fn insert_unique<T: ?Sized + Hash>(&self, item: &T) -> Result<bool, Error> {
83 self.insert_unique_as_of(item, Self::now_timestamp())
84 }
85
86 #[inline(always)]
87 fn insert_unique_as_of<T: ?Sized + Hash>(&self, item: &T, now: u64) -> Result<bool, Error> {
88 let write_slot = self.write_slot(now);
89 self.slots[write_slot].insert_unique(item)
90 }
91
92 pub fn count<T: ?Sized + Hash>(&self, item: &T) -> usize {
96 self.slots.iter().map(|f| f.count(item)).sum()
97 }
98
99 pub fn remove<T: ?Sized + Hash>(&self, item: &T) -> bool {
105 for filter in &self.slots {
106 if filter.contains(item) && filter.remove(item) {
110 return true;
111 }
112 }
113 false
114 }
115
116 pub fn contains<T: ?Sized + Hash>(&self, item: &T) -> bool {
123 for filter in &self.slots {
124 if filter.contains(item) {
125 return true;
126 }
127 }
128 false
129 }
130
131 pub fn len(&self) -> usize {
133 self.slots.iter().map(|f| f.len()).sum()
134 }
135
136 pub fn is_empty(&self) -> bool {
138 self.len() == 0
139 }
140
141 pub fn capacity(&self) -> usize {
143 let slot_capacity = self.slots[0].capacity();
144 slot_capacity * (self.slots.len() - 1)
146 }
147
148 pub fn clear(&self) {
150 for filter in &self.slots {
151 filter.clear();
152 }
153 }
154
155 pub fn lock(&self, kind: LockKind) -> Option<Lock<'_>> {
161 let write_slot = self.write_slot(Self::now_timestamp());
162 self.slots[write_slot].lock(kind)
163 }
164
165 pub fn expire(&self) -> usize {
167 self.expire_as_of(Self::now_timestamp())
168 }
169
170 #[inline]
172 pub fn expire_as_of(&self, now: u64) -> usize {
173 let expire_slot = (1 + self.write_slot(now)) % self.slots.len();
174 let filter = &self.slots[expire_slot];
175 let item_count = filter.len();
176
177 if item_count > 0 {
178 filter.clear();
179 }
180
181 item_count
182 }
183
184 #[inline(always)]
185 fn write_slot(&self, now: u64) -> usize {
186 let slot_count = self.slots.len() as u64;
187 let ttl_segment_duration = self.ttl / (slot_count - 2);
188 let buffer_duration = ttl_segment_duration * slot_count;
189 let now_buffer_time = (now - self.created) % buffer_duration;
190
191 let mut write_slot_start = now_buffer_time.next_multiple_of(ttl_segment_duration);
192 if !now_buffer_time.is_multiple_of(ttl_segment_duration) {
193 write_slot_start -= ttl_segment_duration;
194 }
195
196 (write_slot_start / ttl_segment_duration) as usize
197 }
198
199 #[inline(always)]
200 fn now_timestamp() -> u64 {
201 SystemTime::now()
202 .duration_since(SystemTime::UNIX_EPOCH)
203 .expect("epoch should be earlier than now")
204 .as_secs()
205 }
206}
207
208impl ExpiringAtomicFilter<DefaultHasher> {
209 pub fn builder() -> ExpiringAtomicFilterBuilder<DefaultHasher> {
211 ExpiringAtomicFilterBuilder::default()
212 }
213
214 pub fn new() -> ExpiringAtomicFilter<DefaultHasher> {
216 Self::builder().build().unwrap()
217 }
218
219 pub fn with_capacity(capacity: usize) -> ExpiringAtomicFilter<DefaultHasher> {
221 Self::builder().capacity(capacity).build().unwrap()
222 }
223}
224
225impl Default for ExpiringAtomicFilter<DefaultHasher> {
226 fn default() -> Self {
228 Self::new()
229 }
230}
231
232impl<H> ExpiringAtomicFilterBuilder<H>
233where
234 H: Hasher + Default + Clone,
235{
236 fn validate(&self) -> Result<(), String> {
237 if let (Some(ttl), Some(expiration_period)) = (self.ttl, self.expiration_period)
238 && !ttl.is_multiple_of(expiration_period)
239 {
240 return Err("ttl must be a multiple of expiration_period".into());
241 }
242 Ok(())
243 }
244
245 pub fn build(&self) -> Result<ExpiringAtomicFilter<H>, ExpiringAtomicFilterBuilderError> {
247 let mut filter = self.base_build()?;
248
249 filter.created = ExpiringAtomicFilter::<H>::now_timestamp();
250
251 let slot_count = 2 + (filter.ttl / filter.expiration_period) as usize;
253 let unexpired_slot_capacity = filter.capacity / (slot_count - 2);
254
255 let mut slots = Vec::with_capacity(slot_count);
256 for _ in 0..slot_count {
257 let filter = CuckooFilterBuilder::default()
258 .capacity(unexpired_slot_capacity)
259 .fingerprint_size(filter.fingerprint_size)
260 .bucket_size(filter.bucket_size)
261 .max_evictions(filter.max_evictions)
262 .build()
263 .map_err(ExpiringAtomicFilterBuilderError::from)?;
264 slots.push(filter);
265 }
266 filter.slots = slots;
267
268 Ok(filter)
269 }
270}
271
272impl From<CuckooFilterBuilderError> for ExpiringAtomicFilterBuilderError {
273 fn from(value: CuckooFilterBuilderError) -> Self {
274 match value {
275 CuckooFilterBuilderError::ValidationError(mut description) => {
276 if description == "capacity must be greater than zero" {
277 description = "capacity must be at least ttl / expiration_period".into();
278 }
279 Self::ValidationError(description)
280 }
281 CuckooFilterBuilderError::UninitializedField(field_name) => {
282 Self::UninitializedField(field_name)
283 }
284 _ => todo!(),
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_write_slot() {
295 let now = ExpiringAtomicFilter::<DefaultHasher>::now_timestamp();
296
297 let filter = ExpiringAtomicFilter::builder()
298 .capacity(2)
299 .ttl(hs(25))
300 .expiration_period(hs(12) + ms(30))
301 .build()
302 .unwrap();
303
304 let cases = [
305 (now + hs(12) + ms(29) + 59, 0),
306 (now + hs(12) + ms(30), 1),
307 (now + hs(24) + ms(59) + 59, 1),
308 (now + hs(25), 2),
309 (now + hs(37) + ms(29) + 59, 2),
310 (now + hs(37) + ms(30), 3),
311 (now + hs(49) + ms(59) + 59, 3),
312 (now + hs(50), 0),
313 ];
314
315 for (now_input, slot_num) in cases {
316 assert_eq!(filter.write_slot(now_input), slot_num);
317 }
318 }
319
320 #[test]
321 fn test_expire_as_of() {
322 let now = ExpiringAtomicFilter::<DefaultHasher>::now_timestamp();
323
324 let filter = ExpiringAtomicFilterBuilder::<ahash::AHasher>::default()
325 .capacity(50)
326 .ttl(hs(25))
327 .expiration_period(ms(30))
328 .build()
329 .unwrap();
330
331 assert_eq!(
333 filter.insert_unique_as_of("item1", now + ms(29) + 59),
334 Ok(true)
335 );
336 assert_eq!(
337 filter.expire_as_of(now + hs(25) + ms(29) + 59),
338 0,
339 "item1 at max age"
340 );
341 assert!(filter.contains("item1"));
342 assert_eq!(
343 filter.expire_as_of(now + hs(25) + ms(30)),
344 1,
345 "item1 expires after TTL"
346 );
347 assert!(!filter.contains("item1"));
348
349 assert_eq!(
351 filter.insert_unique_as_of("item2", now + hs(24) + ms(59) + 59),
352 Ok(true)
353 );
354 assert_eq!(
355 filter.expire_as_of(now + hs(50) + ms(30)),
356 0,
357 "too late to expire item2"
358 );
359 assert!(filter.contains("item2"));
360 assert_eq!(
361 filter.expire_as_of(now + hs(50) + ms(29) + 59),
362 1,
363 "item2 expired at last possible time"
364 );
365 assert!(!filter.contains("item2"));
366 }
367
368 fn hs(i: u64) -> u64 {
369 i * 3600
370 }
371
372 fn ms(i: u64) -> u64 {
373 i * 60
374 }
375}