use std::collections::VecDeque;
use std::hash::{BuildHasher, Hasher};
use std::time::Duration;
use crate::Timestamp;
use crate::view::PacketView;
#[derive(Debug)]
pub struct Dedup {
window: Duration,
capacity: usize,
ring: VecDeque<Entry>,
dropped: u64,
}
#[derive(Debug, Clone, Copy)]
struct Entry {
hash: u64,
len: u32,
ts: Timestamp,
}
impl Dedup {
pub const DEFAULT_RING_SIZE: usize = 256;
pub const DEFAULT_LOOPBACK_WINDOW: Duration = Duration::from_millis(1);
pub fn new(window: Duration, capacity: usize) -> Self {
let capacity = capacity.max(1);
Self {
window,
capacity,
ring: VecDeque::with_capacity(capacity),
dropped: 0,
}
}
pub fn loopback() -> Self {
Self::new(Self::DEFAULT_LOOPBACK_WINDOW, Self::DEFAULT_RING_SIZE)
}
pub fn keep(&mut self, view: PacketView<'_>) -> bool {
let hash = hash_frame(view.frame);
let len = view.frame.len() as u32;
let is_dup = self.ring.iter().any(|entry| {
entry.hash == hash
&& entry.len == len
&& view.timestamp.saturating_sub(entry.ts) <= self.window
});
self.push_entry(Entry {
hash,
len,
ts: view.timestamp,
});
if is_dup {
self.dropped += 1;
}
!is_dup
}
pub fn dropped(&self) -> u64 {
self.dropped
}
pub fn buffered(&self) -> usize {
self.ring.len()
}
fn push_entry(&mut self, e: Entry) {
if self.ring.len() >= self.capacity {
self.ring.pop_front();
}
self.ring.push_back(e);
}
}
fn hash_frame(frame: &[u8]) -> u64 {
let state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let mut hasher = state.build_hasher();
hasher.write(frame);
hasher.finish()
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(sec: u32, nsec: u32) -> Timestamp {
Timestamp::new(sec, nsec)
}
#[test]
fn keeps_first_drops_duplicate_within_window() {
let mut d = Dedup::loopback();
let frame = [1u8, 2, 3, 4];
assert!(d.keep(PacketView::new(&frame, ts(0, 0))));
assert!(!d.keep(PacketView::new(&frame, ts(0, 500_000))));
assert_eq!(d.dropped(), 1);
}
#[test]
fn keeps_recurrence_after_window() {
let mut d = Dedup::new(Duration::from_millis(1), 256);
let frame = [1u8, 2, 3];
assert!(d.keep(PacketView::new(&frame, ts(0, 0))));
assert!(d.keep(PacketView::new(&frame, ts(0, 2_000_000))));
assert_eq!(d.dropped(), 0);
}
#[test]
fn different_frames_pass_through() {
let mut d = Dedup::loopback();
let f1 = [1u8, 2, 3];
let f2 = [4u8, 5, 6];
assert!(d.keep(PacketView::new(&f1, ts(0, 0))));
assert!(d.keep(PacketView::new(&f2, ts(0, 100_000))));
assert_eq!(d.dropped(), 0);
}
#[test]
fn ring_bounded() {
let mut d = Dedup::new(Duration::from_millis(1), 2);
let f1 = [1u8];
let f2 = [2u8];
let f3 = [3u8];
d.keep(PacketView::new(&f1, ts(0, 0)));
d.keep(PacketView::new(&f2, ts(0, 100_000)));
d.keep(PacketView::new(&f3, ts(0, 200_000)));
assert_eq!(d.buffered(), 2);
assert!(d.keep(PacketView::new(&f1, ts(0, 300_000))));
}
#[test]
fn capacity_clamped_to_one() {
let mut d = Dedup::new(Duration::from_millis(1), 0);
let f = [1u8];
assert!(d.keep(PacketView::new(&f, ts(0, 0))));
assert!(!d.keep(PacketView::new(&f, ts(0, 100_000))));
}
#[test]
fn same_hash_different_len_kept() {
let mut d = Dedup::loopback();
let f1 = [1u8, 2];
let f2 = [1u8, 2, 3];
assert!(d.keep(PacketView::new(&f1, ts(0, 0))));
assert!(d.keep(PacketView::new(&f2, ts(0, 100_000))));
assert_eq!(d.dropped(), 0);
}
}