use async_std::channel::{Receiver, Sender};
use std::{
collections::{BTreeMap, BTreeSet},
net::IpAddr,
};
const BAN_WINDOW: u64 = 60;
const BAN_SLOT: u64 = 1;
const BAN_ON_MESSAGES_OVER: usize = 600;
const BAN_ON_BYTES_OVER: usize = 60000;
const BAN_ON_DUPES_OVER: usize = 100;
pub const STAT_MESSAGES: &str = "messages";
pub const STAT_BYTES: &str = "bytes";
pub const STAT_DUPES: &str = "dupmsg";
fn now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("linear time")
.as_secs()
}
pub async fn reps(
rep_rx: Receiver<(IpAddr, &'static str, usize)>,
ban_tx: Sender<BTreeSet<IpAddr>>,
) {
let mut ip_rep = BTreeMap::new();
let mut old_bans = BTreeSet::new();
while let Ok((ip, stat, count)) = rep_rx.recv().await {
let slot = BAN_SLOT * (now() / BAN_SLOT);
let tr = ip_rep.entry(slot).or_insert_with(BTreeMap::new);
let ipr = tr.entry(ip).or_insert_with(BTreeMap::new);
let stat = ipr.entry(stat).or_insert(0usize);
*stat = stat.saturating_add(count);
ip_rep.retain(|k, _| *k >= slot.saturating_sub(BAN_WINDOW));
let mut bans = BTreeSet::new();
let mut ipstats = BTreeMap::new();
for (_, rep) in &ip_rep {
for (ip, rep) in rep {
for (stat, count) in rep {
let ip = ipstats.entry(*ip).or_insert_with(BTreeMap::new);
let counter = ip.entry(*stat).or_insert(0usize);
*counter = counter.saturating_add(*count);
}
}
}
for (ip, rep) in &ipstats {
for (stat, count) in rep {
match (&**stat, count) {
(self::STAT_BYTES, self::BAN_ON_BYTES_OVER..)
| (self::STAT_MESSAGES, self::BAN_ON_MESSAGES_OVER..)
| (self::STAT_DUPES, self::BAN_ON_DUPES_OVER..) => {
bans.insert(*ip);
}
_ => {}
}
}
}
if old_bans != bans {
eprintln!("banned {} slots {}\n {ipstats:?}", bans.len(), ip_rep.len());
old_bans = bans.clone();
ban_tx.send(bans).await.ok();
}
}
}