use ptrs::trace;
use siphasher::{prelude::*, sip::SipHasher24};
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
const MAX_FILTER_SIZE: usize = 100 * 1024;
#[derive(Clone, PartialEq)]
struct Entry {
digest: u64,
first_seen: Instant,
}
pub struct ReplayFilter(Arc<Mutex<InnerReplayFilter>>);
impl ReplayFilter {
pub fn new(ttl: Duration) -> Self {
Self(Arc::new(Mutex::new(InnerReplayFilter::new(
ttl,
MAX_FILTER_SIZE,
))))
}
pub fn test_and_set(&self, now: Instant, buf: impl AsRef<[u8]>) -> bool {
let mut inner = self.0.lock().unwrap();
inner.test_and_set(now, buf)
}
}
struct InnerReplayFilter {
filter: HashMap<u64, Entry>,
fifo: VecDeque<Entry>,
key: [u8; 16],
ttl_limit: Duration,
max_cap: usize,
}
impl InnerReplayFilter {
fn new(ttl_limit: Duration, max_cap: usize) -> Self {
let mut key = [0_u8; 16];
getrandom::getrandom(&mut key).unwrap();
Self {
filter: HashMap::new(),
fifo: VecDeque::new(),
key,
ttl_limit,
max_cap,
}
}
fn test_and_set(&mut self, now: Instant, buf: impl AsRef<[u8]>) -> bool {
self.garbage_collect(now);
let mut hash = SipHasher24::new_with_key(&self.key);
let digest: u64 = {
hash.write(buf.as_ref());
hash.finish().to_be()
};
trace!("checking inner");
if self.filter.contains_key(&digest) {
return true;
}
trace!("not found: {digest}... inserting");
let e = Entry {
digest,
first_seen: now,
};
self.fifo.push_front(e.clone());
self.filter.insert(digest, e);
trace!("inserted: {}", self.filter.len());
false
}
fn garbage_collect(&mut self, now: Instant) {
if self.fifo.is_empty() {
return;
}
while !self.fifo.is_empty() {
let e = match self.fifo.back() {
Some(e) => e,
None => return,
};
trace!(
"{}/{}[/{}] - {:?}",
self.fifo.len(),
self.filter.len(),
self.max_cap,
self.ttl_limit
);
if self.fifo.len() < self.max_cap && self.ttl_limit > Duration::from_millis(0) {
let delta_t = now - e.first_seen;
trace!("{:?} > {:?}", now, e.first_seen);
if now < e.first_seen {
trace!("Invalid time");
self.reset();
return;
} else if delta_t < self.ttl_limit {
return;
}
}
trace!("removing entry");
_ = self.filter.remove(&e.digest);
_ = self.fifo.pop_back();
}
}
fn reset(&mut self) {
trace!("RESETING");
self.filter = HashMap::new();
self.fifo = VecDeque::new();
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::init_subscriber;
use crate::Result;
#[test]
fn replay_filter_ops() -> Result<()> {
init_subscriber();
let ttl = Duration::from_secs(10);
let f = &mut ReplayFilter::new(ttl);
let buf = b"For a moment, nothing happened. Then, after a second or so, nothing continued to happen.";
let mut now = Instant::now();
assert!(
!f.test_and_set(now, buf),
"test_and_set (mutex) empty filter returned true"
);
assert!(
f.test_and_set(now, buf),
"test_and_set (mutex) populated filter (replayed) returned false"
);
let f = &mut InnerReplayFilter::new(ttl, 2);
assert!(
!f.test_and_set(now, buf),
"test_and_set empty filter returned true"
);
assert!(
f.test_and_set(now, buf),
"test_and_set populated filter (replayed) returned false"
);
let buf2 = b"We demand rigidly defined areas of doubt and uncertainty!";
now += ttl;
assert!(
!f.test_and_set(now, buf2),
"test_and_set populated filter, 2nd entry returned true"
);
assert!(
f.test_and_set(now, buf2),
"test_and_set populated filter, 2nd entry (replayed) returned false"
);
assert!(
!f.test_and_set(now, buf),
"test_and_set populated filter, compact check returned true"
);
now = Instant::now();
assert!(
!f.test_and_set(now, buf),
"test_and_set populated filter, backward time jump returned true"
);
assert_eq!(
f.fifo.len(),
1,
"filter fifo has a unexpected number of entries: {}",
f.fifo.len()
);
assert_eq!(
f.filter.len(),
1,
"filter map has a unexpected number of entries: {}",
f.filter.len()
);
assert!(
f.test_and_set(now, buf),
"test_and_set populated filter, post-backward clock jump (replayed) returned false"
);
f.test_and_set(now, "message2");
for i in 0..10 {
assert_eq!(
f.fifo.len(),
2,
"filter fifo has a unexpected number of entries: {}",
f.fifo.len()
);
assert_eq!(
f.filter.len(),
2,
"filter map has a unexpected number of entries: {}",
f.filter.len()
);
assert!(
!f.test_and_set(now, format!("message-1{i}")),
"unique message failed insert (returned true)"
);
}
Ok(())
}
}