venues 0.3.2

Privacy friendly online meeting service
Documentation
use async_std::channel::{Receiver, Sender};
use std::{
    collections::{BTreeMap, BTreeSet},
    net::IpAddr,
};

const BAN_WINDOW: u64 = 60;
const BAN_SLOT: u64 = 1;
const BAN_ON_MESSAGES_OVER: usize = 600;
const BAN_ON_BYTES_OVER: usize = 60000;
const BAN_ON_DUPES_OVER: usize = 100;

pub const STAT_MESSAGES: &str = "messages";
pub const STAT_BYTES: &str = "bytes";
pub const STAT_DUPES: &str = "dupmsg";

fn now() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .expect("linear time")
        .as_secs()
}

pub async fn reps(
    rep_rx: Receiver<(IpAddr, &'static str, usize)>,
    ban_tx: Sender<BTreeSet<IpAddr>>,
) {
    let mut ip_rep = BTreeMap::new();
    let mut old_bans = BTreeSet::new();

    while let Ok((ip, stat, count)) = rep_rx.recv().await {
        let slot = BAN_SLOT * (now() / BAN_SLOT);
        let tr = ip_rep.entry(slot).or_insert_with(BTreeMap::new);
        let ipr = tr.entry(ip).or_insert_with(BTreeMap::new);
        let stat = ipr.entry(stat).or_insert(0usize);
        *stat = stat.saturating_add(count);

        ip_rep.retain(|k, _| *k >= slot.saturating_sub(BAN_WINDOW));

        let mut bans = BTreeSet::new();
        let mut ipstats = BTreeMap::new();
        for (_, rep) in &ip_rep {
            for (ip, rep) in rep {
                for (stat, count) in rep {
                    let ip = ipstats.entry(*ip).or_insert_with(BTreeMap::new);
                    let counter = ip.entry(*stat).or_insert(0usize);
                    *counter = counter.saturating_add(*count);
                }
            }
        }
        for (ip, rep) in &ipstats {
            for (stat, count) in rep {
                match (&**stat, count) {
                    (self::STAT_BYTES, self::BAN_ON_BYTES_OVER..)
                    | (self::STAT_MESSAGES, self::BAN_ON_MESSAGES_OVER..)
                    | (self::STAT_DUPES, self::BAN_ON_DUPES_OVER..) => {
                        bans.insert(*ip);
                    }
                    _ => {}
                }
            }
        }
        if old_bans != bans {
            eprintln!("banned {} slots {}\n {ipstats:?}", bans.len(), ip_rep.len());
            old_bans = bans.clone();
            ban_tx.send(bans).await.ok();
        }
    }
}