use std::time::Duration;
use crate::packet::{Packet, PacketDirection, Timestamp};
pub struct Dedup {
ring: Vec<Option<Entry>>,
head: usize,
window: Duration,
direction_aware: bool,
dropped: u64,
seen: u64,
}
#[derive(Clone, Copy)]
struct Entry {
hash: u64,
len: u32,
ts_ns: u128,
direction: PacketDirection,
}
impl Dedup {
pub fn loopback() -> Self {
Self::new(Duration::from_millis(1), 256, true)
}
pub fn content(window: Duration, ring_size: usize) -> Self {
Self::new(window, ring_size, false)
}
pub fn new(window: Duration, ring_size: usize, direction_aware: bool) -> Self {
let ring_size = ring_size.max(1);
Self {
ring: vec![None; ring_size],
head: 0,
window,
direction_aware,
dropped: 0,
seen: 0,
}
}
pub fn keep(&mut self, pkt: &Packet<'_>) -> bool {
self.keep_raw(pkt.data(), pkt.direction(), pkt.timestamp())
}
pub fn keep_raw(&mut self, data: &[u8], direction: PacketDirection, ts: Timestamp) -> bool {
self.seen += 1;
let hash = xxhash_rust::xxh3::xxh3_64(data);
let len = data.len() as u32;
let ts_ns = (ts.sec as u128) * 1_000_000_000 + (ts.nsec as u128);
let window_ns = self.window.as_nanos();
for slot in &self.ring {
let Some(e) = slot else { continue };
if e.hash != hash || e.len != len {
continue;
}
let elapsed = ts_ns.saturating_sub(e.ts_ns);
if elapsed > window_ns {
continue;
}
if !self.direction_aware {
self.dropped += 1;
return false;
}
if directions_complementary(e.direction, direction) {
self.dropped += 1;
return false;
}
}
self.ring[self.head] = Some(Entry {
hash,
len,
ts_ns,
direction,
});
self.head = (self.head + 1) % self.ring.len();
true
}
pub fn dropped(&self) -> u64 {
self.dropped
}
pub fn seen(&self) -> u64 {
self.seen
}
pub fn reset(&mut self) {
for slot in &mut self.ring {
*slot = None;
}
self.head = 0;
self.dropped = 0;
self.seen = 0;
}
}
fn directions_complementary(a: PacketDirection, b: PacketDirection) -> bool {
matches!(
(a, b),
(PacketDirection::Outgoing, PacketDirection::Host)
| (PacketDirection::Host, PacketDirection::Outgoing)
)
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(sec: u32, nsec: u32) -> Timestamp {
Timestamp::new(sec, nsec)
}
#[test]
fn loopback_drops_outgoing_then_host_within_window() {
let mut d = Dedup::loopback();
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0)));
assert!(!d.keep_raw(b"abc", PacketDirection::Host, ts(0, 100_000)));
assert_eq!(d.dropped(), 1);
assert_eq!(d.seen(), 2);
}
#[test]
fn loopback_keeps_outside_window() {
let mut d = Dedup::loopback();
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0)));
assert!(d.keep_raw(b"abc", PacketDirection::Host, ts(0, 2_000_000)));
assert_eq!(d.dropped(), 0);
}
#[test]
fn loopback_keeps_same_direction_repeats() {
let mut d = Dedup::loopback();
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0)));
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 100_000)));
assert_eq!(d.dropped(), 0);
}
#[test]
fn content_drops_same_hash_regardless_of_direction() {
let mut d = Dedup::content(Duration::from_millis(5), 64);
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0)));
assert!(!d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 100_000)));
assert_eq!(d.dropped(), 1);
}
#[test]
fn ring_overflow_evicts_oldest() {
let mut d = Dedup::content(Duration::from_secs(1), 4);
for i in 0..4u8 {
assert!(d.keep_raw(&[i], PacketDirection::Host, ts(0, 0)));
}
assert!(d.keep_raw(&[4], PacketDirection::Host, ts(0, 0)));
assert!(d.keep_raw(&[0], PacketDirection::Host, ts(0, 0)));
assert!(!d.keep_raw(&[4], PacketDirection::Host, ts(0, 0)));
}
#[test]
fn different_lengths_dont_match_even_on_hash_collision() {
let mut d = Dedup::content(Duration::from_secs(1), 64);
assert!(d.keep_raw(b"abc", PacketDirection::Host, ts(0, 0)));
assert!(d.keep_raw(b"abcd", PacketDirection::Host, ts(0, 0)));
assert_eq!(d.dropped(), 0);
}
#[test]
fn reset_clears_state() {
let mut d = Dedup::loopback();
d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0));
d.keep_raw(b"abc", PacketDirection::Host, ts(0, 100_000));
assert_eq!(d.dropped(), 1);
d.reset();
assert_eq!(d.dropped(), 0);
assert_eq!(d.seen(), 0);
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(0, 0)));
assert!(!d.keep_raw(b"abc", PacketDirection::Host, ts(0, 100_000)));
}
#[test]
fn empty_payload_handled() {
let mut d = Dedup::loopback();
assert!(d.keep_raw(b"", PacketDirection::Outgoing, ts(0, 0)));
assert!(!d.keep_raw(b"", PacketDirection::Host, ts(0, 1_000)));
}
#[test]
fn directions_complementary_table() {
use PacketDirection::*;
assert!(directions_complementary(Outgoing, Host));
assert!(directions_complementary(Host, Outgoing));
assert!(!directions_complementary(Host, Host));
assert!(!directions_complementary(Outgoing, Outgoing));
assert!(!directions_complementary(Broadcast, Outgoing));
assert!(!directions_complementary(Outgoing, Multicast));
}
#[test]
fn timestamp_clock_skew_doesnt_break() {
let mut d = Dedup::loopback();
assert!(d.keep_raw(b"abc", PacketDirection::Outgoing, ts(1, 0)));
assert!(!d.keep_raw(b"abc", PacketDirection::Host, ts(0, 999_999_999)));
}
}