use ahash::AHasher;
use parking_lot::Mutex;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;
use tracing::trace;
#[inline(always)]
fn ahash_hash(payload: &[u8]) -> u64 {
let mut hasher = AHasher::default();
payload.hash(&mut hasher);
hasher.finish()
}
pub struct Dedup {
dedup_period: Duration,
buckets: Vec<HashSet<u64, ahash::RandomState>>,
current_bucket: usize,
#[allow(dead_code)] bucket_interval: Duration,
num_buckets: usize,
}
impl Dedup {
pub fn new(dedup_period: Duration) -> Self {
if dedup_period.is_zero() {
return Self {
dedup_period,
buckets: Vec::new(),
current_bucket: 0,
bucket_interval: Duration::ZERO,
num_buckets: 0,
};
}
let bucket_interval = Duration::from_millis(100);
let mut num_buckets = (dedup_period.as_millis() / bucket_interval.as_millis()) as usize;
num_buckets = num_buckets.max(2);
let mut buckets = Vec::with_capacity(num_buckets);
for _ in 0..num_buckets {
buckets.push(HashSet::with_hasher(ahash::RandomState::new()));
}
trace!(
"Dedup: new instance with dedup_period {:?}, num_buckets {}, bucket_interval {:?}",
dedup_period,
num_buckets,
bucket_interval
);
Self {
dedup_period,
buckets,
current_bucket: 0,
bucket_interval,
num_buckets,
}
}
#[allow(dead_code)] pub fn rotation_interval(&self) -> Duration {
self.bucket_interval
}
#[allow(dead_code)] pub fn is_duplicate(&self, payload: &[u8]) -> bool {
if self.dedup_period.is_zero() {
return false;
}
let hash = ahash_hash(payload);
self.buckets.iter().any(|b| b.contains(&hash))
}
#[allow(dead_code)] pub fn insert(&mut self, payload: &[u8]) {
if self.dedup_period.is_zero() {
return;
}
let hash = ahash_hash(payload);
self.buckets[self.current_bucket].insert(hash);
}
#[inline]
#[allow(dead_code)] pub fn check_and_insert(&mut self, payload: &[u8]) -> bool {
if self.dedup_period.is_zero() {
return false;
}
let hash = ahash_hash(payload);
if self.buckets.iter().any(|b| b.contains(&hash)) {
return true; }
self.buckets[self.current_bucket].insert(hash);
false
}
pub fn rotate_bucket(&mut self) {
if self.dedup_period.is_zero() {
return;
}
self.current_bucket = (self.current_bucket + 1) % self.num_buckets;
self.buckets[self.current_bucket].clear();
trace!("Dedup: Rotated to bucket {}", self.current_bucket);
}
}
const NUM_SHARDS: usize = 16;
#[derive(Clone)]
pub struct ConcurrentDedup {
shards: Arc<[Mutex<Dedup>; NUM_SHARDS]>,
dedup_period: Duration,
bucket_interval: Duration,
}
impl ConcurrentDedup {
pub fn new(dedup_period: Duration) -> Self {
let shards: [Mutex<Dedup>; NUM_SHARDS] =
std::array::from_fn(|_| Mutex::new(Dedup::new(dedup_period)));
let bucket_interval = if dedup_period.is_zero() {
Duration::ZERO
} else {
Duration::from_millis(100)
};
trace!(
"ConcurrentDedup: new instance with {} shards, dedup_period {:?}",
NUM_SHARDS,
dedup_period
);
Self {
shards: Arc::new(shards),
dedup_period,
bucket_interval,
}
}
#[inline]
pub fn rotation_interval(&self) -> Duration {
self.bucket_interval
}
#[inline]
pub fn check_and_insert(&self, payload: &[u8]) -> bool {
if self.dedup_period.is_zero() {
return false;
}
let hash = ahash_hash(payload);
let shard_idx = (hash as usize) & (NUM_SHARDS - 1);
let mut shard = self.shards[shard_idx].lock();
shard.check_and_insert_with_hash(hash)
}
pub fn rotate_buckets(&self) {
if self.dedup_period.is_zero() {
return;
}
for shard in self.shards.iter() {
shard.lock().rotate_bucket();
}
trace!("ConcurrentDedup: Rotated all {} shards", NUM_SHARDS);
}
}
impl Dedup {
#[inline]
fn check_and_insert_with_hash(&mut self, hash: u64) -> bool {
if self.dedup_period.is_zero() {
return false;
}
if self.buckets.iter().any(|b| b.contains(&hash)) {
return true; }
self.buckets[self.current_bucket].insert(hash);
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_dedup_time_wheel() {
let dedup_period = Duration::from_millis(500); let bucket_interval = Duration::from_millis(100); let mut dedup = Dedup::new(dedup_period);
assert_eq!(dedup.rotation_interval(), bucket_interval);
assert_eq!(dedup.num_buckets, 5);
let data1 = b"hello";
let data2 = b"world";
assert!(!dedup.is_duplicate(data1));
dedup.insert(data1);
assert!(dedup.is_duplicate(data1));
assert!(!dedup.is_duplicate(data2));
for i in 0..dedup.num_buckets {
println!("Rotate {} / {}", i + 1, dedup.num_buckets);
dedup.rotate_bucket();
if i < dedup.num_buckets - 1 {
assert!(
dedup.is_duplicate(data1),
"Data1 should be duplicate after {} rotations",
i
);
} else {
assert!(
!dedup.is_duplicate(data1),
"Data1 should NOT be duplicate after {} rotations",
i
);
}
}
let dedup_period = Duration::from_millis(200);
let mut dedup_exp = Dedup::new(dedup_period); assert_eq!(dedup_exp.num_buckets, 2);
let data = b"expire_test";
assert!(!dedup_exp.is_duplicate(data));
dedup_exp.insert(data);
assert!(dedup_exp.is_duplicate(data));
thread::sleep(dedup_exp.rotation_interval());
dedup_exp.rotate_bucket();
assert!(dedup_exp.is_duplicate(data));
thread::sleep(dedup_exp.rotation_interval());
dedup_exp.rotate_bucket();
assert!(!dedup_exp.is_duplicate(data));
let mut dedup_zero = Dedup::new(Duration::ZERO);
assert!(!dedup_zero.is_duplicate(data1));
dedup_zero.insert(data1);
assert!(!dedup_zero.is_duplicate(data1));
}
}