use crate::runtime::Runtime;
use futures::FutureExt;
use hashbrown::HashSet;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
const LOG_TARGET: &str = "emissary::ssu2::duplicate-filter";
const DUPLICATE_FILTER_DECAY_INTERVAL: Duration = Duration::from_secs(60);
pub struct DuplicateFilter<R: Runtime> {
current: HashSet<u32>,
decay_timer: R::Timer,
previous: HashSet<u32>,
}
impl<R: Runtime> DuplicateFilter<R> {
pub fn new() -> Self {
Self {
current: HashSet::new(),
previous: HashSet::new(),
decay_timer: R::timer(DUPLICATE_FILTER_DECAY_INTERVAL),
}
}
pub fn insert(&mut self, message_id: u32) -> bool {
if self.current.contains(&message_id) || self.previous.contains(&message_id) {
return false;
}
self.current.insert(message_id);
true
}
fn decay(&mut self) {
self.previous = core::mem::take(&mut self.current);
}
}
impl<R: Runtime> Future for DuplicateFilter<R> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
futures::ready!(self.decay_timer.poll_unpin(cx));
{
tracing::trace!(
target: LOG_TARGET,
"decaying ssu2 duplicate filter",
);
self.decay();
self.decay_timer = R::timer(DUPLICATE_FILTER_DECAY_INTERVAL);
let _ = self.decay_timer.poll_unpin(cx);
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::mock::MockRuntime;
#[tokio::test]
async fn insert_duplicate_and_decay() {
let mut filter = DuplicateFilter::<MockRuntime>::new();
assert!(filter.insert(1337));
assert!(!filter.insert(1337));
filter.decay();
assert!(!filter.insert(1337));
filter.decay();
assert!(filter.insert(1337));
assert!(!filter.insert(1337));
}
#[tokio::test(start_paused = true)]
async fn decay_timer_works() {
let mut filter = DuplicateFilter::<MockRuntime>::new();
filter.decay_timer = MockRuntime::timer(Duration::from_secs(5));
assert!(filter.insert(1337));
assert!(!filter.insert(1337));
assert_eq!(filter.current.len(), 1);
assert!(tokio::time::timeout(Duration::from_secs(8), &mut filter).await.is_err());
assert!(filter.current.is_empty());
assert_eq!(filter.previous.len(), 1);
assert!(!filter.insert(1337));
filter.decay_timer = MockRuntime::timer(Duration::from_secs(5));
assert!(tokio::time::timeout(Duration::from_secs(8), &mut filter).await.is_err());
assert!(filter.current.is_empty());
assert!(filter.current.is_empty());
assert!(filter.insert(1337));
}
}