use crate::cache::Cache;
use sn_messaging::{node::RoutingMsg, DstLocation, MessageId};
use std::time::Duration;
use xor_name::XorName;
const INCOMING_EXPIRY_DURATION: Duration = Duration::from_secs(20 * 60);
const OUTGOING_EXPIRY_DURATION: Duration = Duration::from_secs(10 * 60);
const MAX_ENTRIES: usize = 15_000;
#[derive(Eq, PartialEq)]
pub enum FilteringResult {
NewMessage,
KnownMessage,
}
impl FilteringResult {
pub fn is_new(&self) -> bool {
match self {
Self::NewMessage => true,
Self::KnownMessage => false,
}
}
}
pub(crate) struct MessageFilter {
incoming: Cache<MessageId, ()>,
outgoing: Cache<(MessageId, XorName), ()>,
}
impl MessageFilter {
pub fn new() -> Self {
Self {
incoming: Cache::with_expiry_duration_and_capacity(
INCOMING_EXPIRY_DURATION,
MAX_ENTRIES,
),
outgoing: Cache::with_expiry_duration_and_capacity(
OUTGOING_EXPIRY_DURATION,
MAX_ENTRIES,
),
}
}
pub async fn filter_outgoing(&self, msg: &RoutingMsg, pub_id: &XorName) -> FilteringResult {
if let DstLocation::DirectAndUnrouted = msg.dst {
return FilteringResult::NewMessage;
}
if self
.outgoing
.set((msg.id, *pub_id), (), None)
.await
.is_some()
{
trace!("Outgoing message filtered: {:?}", msg.id);
FilteringResult::KnownMessage
} else {
FilteringResult::NewMessage
}
}
pub async fn add_to_filter(&self, msg_id: &MessageId) -> bool {
let cur_value = self.incoming.set(*msg_id, (), None).await;
if cur_value.is_some() {
trace!("Incoming message filtered: {:?}", msg_id);
}
cur_value.is_none()
}
pub async fn reset(&mut self) {
self.incoming.clear().await;
self.outgoing.clear().await;
}
}
impl Default for MessageFilter {
fn default() -> Self {
Self::new()
}
}