venues 0.3.2

Privacy friendly online meeting service
Documentation
use async_std::net::UdpSocket;
use async_std::{
    channel::{Receiver, RecvError, SendError, Sender},
    stream::StreamExt,
};
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream};
use log::*;
use presence::dedup::Dedup;
use presence::seen::Seen;
use presence::Presence;
use smol_timeout::TimeoutExt;
use std::sync::atomic::AtomicUsize;
use std::{
    collections::{BTreeMap, BTreeSet},
    io,
    net::IpAddr,
    pin::Pin,
    sync::Arc,
    time::{Duration, Instant},
};
use webrtc_dtls::{config::Config, crypto::Certificate};

const MAX_IDLE: u64 = 300;

use crate::reps::{STAT_BYTES, STAT_MESSAGES,STAT_DUPES};

pub async fn talk<'a>(
    on: String,
    works: Sender<Pin<Box<dyn Future<Output = ()>>>>,
    rep_tx: Sender<(IpAddr, &'static str, usize)>,
    ban_rx: Receiver<BTreeSet<IpAddr>>,
    mut certs: impl Stream<Item = (Vec<String>, Certificate)> + Unpin,
) -> anyhow::Result<()> {
    let dupes = Arc::new(
        Seen::builder()
            .max(3)
            .precision(1000)
            .extra_hits(2)
            .build()
            .into_dedup(),
    );
    let (reset_tx, reset_rx) = async_std::channel::unbounded();
    let pending_rx = reset_rx.clone();
    let mut talks: Pin<Box<dyn Future<Output = ()>>> =
        Box::pin(async move { pending_rx.recv().await.unwrap_or_else(|_| {}) });
    loop {
        let on = on.clone();
        let works = works.clone();
        let rep_tx = rep_tx.clone();
        let ban_rx = ban_rx.clone();
        let reset_rx = reset_rx.clone();
        let dupes = dupes.clone();
        select! {
            _ = talks.as_mut().fuse() => break,
            res = certs.next().fuse() => talks = {
                info!("Closing previous talks...");
                let _ = reset_tx.send(()).await;
                talks.await;
                match res{
                    None => {
                        warn!("No more talks...");
                        break
                    },
                    Some((_domains, cert))=> {
                        info!("Alright, setting up new talks...");
                        Box::pin(async move {
                            listen(on, cert, works, rep_tx, ban_rx, reset_rx, dupes)
                                .await
                                .unwrap_or_else(|e| error!("talk processing failed - {e}"))
                        })
                    }
                }
            }

        }
    }

    Ok(())
}

#[derive(thiserror::Error, Debug)]
enum TalkError {
    #[error("WebRTC error {0}")]
    WebRtcDtls(#[from] webrtc_dtls::Error),
    #[error("WebRTC error {0}")]
    WebRtcUtil(#[from] webrtc_util::Error),
    #[error("Async receive error {0}")]
    Rcv(#[from] RecvError),
    #[error("Async send credit error {0}")]
    Sync(#[from] SendError<()>),
    #[error("Error processing server certificate")]
    CertParsing,
    #[error("Error receiving raw UDP packet - {0}")]
    UdpRecv(io::Error),
    #[error("Error listening on UDP socket - {0}")]
    UdpBinding(io::Error),
    #[error("Error sending reps")]
    RepsFailed,
}

type PresenceDedup = Dedup<Presence, AtomicUsize>;

pub fn new_certs(
    server_names: Vec<String>,
    cert_rx: Receiver<Option<(Vec<String>, Vec<u8>)>>,
) -> impl Stream<Item = (Vec<String>, Certificate)> {
    info!("Waiting for new certs for {server_names:?}...");
    cert_rx
        .map(move |received| match received {
            Some((d, c)) => anyhow::Ok((
                d,
                Certificate::from_pem(
                    std::str::from_utf8(&c).map_err(|_| TalkError::CertParsing)?,
                )?,
            )),
            None => anyhow::bail!("No cert received"),
        })
        .map(move |res| match res {
            Ok(_) => todo!(),
            Err(e) => {
                warn!("Failed to get a cert for {server_names:?} - {e}, making a self-signed one");
                (
                    server_names.clone(),
                    Certificate::generate_self_signed(server_names.clone())
                        .expect("self-signed cert"),
                )
            }
        })
        .inspect(move |(d, c)| debug!("My cert for {d:?}:\n{}", c.serialize_pem()))
}

async fn listen(
    on: String,
    cert: Certificate,
    works: Sender<Pin<Box<dyn Future<Output = ()>>>>,
    rep_tx: Sender<(IpAddr, &'static str, usize)>,
    ban_rx: Receiver<BTreeSet<IpAddr>>,
    reset_rx: Receiver<()>,
    dedup: Arc<PresenceDedup>,
) -> Result<(), TalkError> {
    let (on, server_name) = match on.rsplit_once(":") {
        Some((name, _port)) => (on.to_owned(), name.into()),
        None => (format!("{on}:443"), on),
    };

    let config = Config {
        server_name,
        certificates: vec![cert.clone()],
        ..Default::default()
    };
    let mut bans = BTreeSet::new();
    let mut participants = BTreeMap::new();
    let sock = Arc::new(UdpSocket::bind(&on).await.map_err(TalkError::UdpBinding)?);
    let local_address = sock.local_addr().map_err(TalkError::UdpBinding)?;
    info!("Listening on {local_address} for {on}");
    let mut buf = [0u8; 2048];
    let mut dedup_last_ease = Instant::now();
    while reset_rx
        .recv()
        .timeout(Duration::from_millis(1))
        .await
        .is_none()
    {
        if dedup_last_ease.elapsed().as_secs() > 60 {
            let instant = Instant::now();
            let cnt = dedup.as_ref().into_iter().count();
            let dur = (dedup_last_ease - instant).as_millis();
            let _ = dedup.ease();
            dedup_last_ease = Instant::now();
            debug!("Dedup ease {cnt} took {dur}ms",);
        }

        debug!("Waiting for next UDP packet...");
        let mut rcv_timeout = Duration::from_secs(10);
        while let Some((size, address)) = sock
            .recv_from(&mut buf)
            .timeout(rcv_timeout)
            .await
            .transpose()
            .map_err(TalkError::UdpRecv)?
        {
            rcv_timeout = Duration::from_millis(1);
            debug!("UDP Packet from {address}: {size}");
            rep_tx
                .send((address.ip(), STAT_BYTES, size))
                .await
                .map_err(|_| TalkError::RepsFailed)?;
            rep_tx
                .send((address.ip(), STAT_MESSAGES, 1))
                .await
                .map_err(|_| TalkError::RepsFailed)?;

            let is_banned = bans.contains(&address.ip());
            let is_participating = participants.contains_key(&address);
            if is_banned && !is_participating {
                info!("Ignoring new client from banned address {address}!");
                continue;
            }

            let mut new_conn = None;
            let participant = participants.entry(address).or_insert_with(|| {
                let (participant, conn) =
                    crate::session::setup(config.clone(), local_address, address, sock.clone());
                new_conn = Some(conn);
                participant
            });

            debug!(
                "Participant on new conn - {}: {participant:?}",
                new_conn.is_some()
            );

            participant.push_packet(&buf[0..size]).await;

            if let Some(fut) = new_conn {
                if works.send(fut).await.is_err() {
                    error!("works disconnect");
                    break;
                }
            }
        }

        if let Some(Ok(new_bans)) = ban_rx.recv().timeout(Duration::from_millis(1)).await {
            if new_bans != bans {
                debug!("new bans: {new_bans:?}");
            }
            bans = new_bans;
        }

        participants.retain(|address, participant| {
            if bans.contains(&address.ip()) {
                warn!("banning {address}");
                false
            } else if participant.expired(MAX_IDLE) {
                warn!("idle {address}");
                false
            } else {
                true
            }
        });
        debug!("We have {} participants", participants.len());
        let msg = participants
            .values_mut()
            .map(|p| p.pop_message())
            .collect::<FuturesUnordered<_>>()
            .filter_map(|msg| msg)
            .next()
            .timeout(Duration::from_millis(10))
            .await
            .flatten();
        debug!("Message {msg:?}");

        if let Some((address, presence, venue_id, chat)) = msg {
            match dedup.check(2, presence.clone()) {
                Ok(true) => {}
                Ok(false) => {
                    debug!("Ignoring duplicate {presence}");
                    let _ = rep_tx.send((address.ip(), STAT_DUPES, 1)).await;
                    continue;
                }
                Err(_) => {
                    warn!("dupcheck broken")
                }
            }
            info!("Chat {} from {address} for {venue_id}", chat.len());
            if !chat.is_empty() {
                let chat = Arc::new((presence, venue_id, chat));
                for p in participants.values_mut() {
                    p.consider(address, chat.clone()).await;
                }
            }
        }
    }
    Ok(())
}