venues 0.3.2

Privacy friendly online meeting service
Documentation
use anyhow::Context;
use async_std::{
    channel::{bounded, unbounded, Receiver, Sender},
    net::UdpSocket,
};
use futures::{select, Future, FutureExt};
use log::*;
use presence::Presence;
use smol_timeout::TimeoutExt;
use std::{
    collections::BTreeSet,
    net::SocketAddr,
    pin::Pin,
    sync::Arc,
    time::{Duration, Instant},
};
use venues::{parse_chat_message_2, Message, VenueId};
use webrtc_dtls::{config::Config, conn::DTLSConn};
use webrtc_util::Conn;

#[derive(Debug)]
pub struct Participant {
    seen: Instant,
    address: SocketAddr,
    venues: BTreeSet<VenueId>,
    desire_rx: Receiver<Desire>,
    credit_tx: Sender<()>,
    chat_tx: Sender<Arc<(Presence, VenueId, Vec<u8>)>>,
    udp_tx: Sender<Vec<u8>>,
}
impl Participant {
    async fn grant_credit(&self) {
        self.credit_tx
            .send(())
            .timeout(Duration::from_millis(5))
            .await
            .unwrap_or_else(|| {
                Ok(warn!(
                    "Send credit for {address} timed out",
                    address = self.address
                ))
            })
            .unwrap_or_else(|e| {
                warn!(
                    "Could not send credit for {address} - {e}",
                    address = self.address
                )
            });
    }
    pub async fn push_packet(&mut self, buf: &[u8]) {
        self.seen = Instant::now();
        self.grant_credit().await;
        self.udp_tx
            .send(buf.to_vec())
            .timeout(Duration::from_millis(5))
            .await
            .unwrap_or_else(|| {
                Ok(warn!(
                    "Digesting packet for {address} timed out",
                    address = self.address
                ))
            })
            .unwrap_or_else(|e| {
                warn!(
                    "Could not digest packet for {address} - {e}",
                    address = self.address
                )
            });
    }
    pub async fn pop_message(&mut self) -> Option<(SocketAddr, Presence, VenueId, Vec<u8>)> {
        match self.desire_rx.recv().await.ok() {
            None => {
                warn!("Desire broke for {}", self.address);
                self.deactivate();
                None
            }
            Some(Desire::Quit) => {
                self.venues.clear();
                self.deactivate();
                None
            }
            Some(Desire::Leave(venue_id)) => {
                self.venues.remove(&venue_id);
                None
            }
            Some(Desire::Chat(presence, venue_id, chat)) => {
                self.venues.insert(venue_id);
                Some((self.address, presence, venue_id, chat))
            }
        }
    }
    pub fn deactivate(&mut self) {
        self.seen = Instant::now() - Duration::MAX;
    }
    pub fn address(&self) -> SocketAddr {
        self.address
    }
    pub fn expired(&self, max_idle: u64) -> bool {
        self.seen.elapsed().as_secs() >= max_idle
    }
    pub async fn consider(&self, address: SocketAddr, chat: Arc<(Presence, VenueId, Vec<u8>)>) {
        if self.address != address && self.venues.contains(&chat.1) {
            let _ = self.chat_tx.send(chat.clone()).await;
            self.grant_credit().await;
        }
    }
}

#[derive(Debug, Clone)]
pub enum Desire {
    Chat(Presence, VenueId, Vec<u8>),
    Leave(VenueId),
    Quit,
}

pub fn setup(
    config: Config,
    local_address: SocketAddr,
    remote_address: SocketAddr,
    udp_sock: Arc<UdpSocket>,
) -> (Participant, Pin<Box<dyn Future<Output = ()>>>) {
    let (udp_tx, udp_rx) = unbounded();
    let (chat_tx, chat_rx) = unbounded();
    let (credit_tx, credit_rx) = bounded(100);
    let (desire_tx, desire_rx) = unbounded();
    let conn = RecvUdpConn {
        udp_rx,
        udp_sock,
        remote_address,
        local_address,
    };
    let conn = DTLSConn::new(Arc::new(conn), config, false, None);
    (
        Participant {
            seen: Instant::now(),
            address: remote_address,
            desire_rx,
            credit_tx,
            chat_tx,
            udp_tx,
            venues: BTreeSet::new(),
        },
        Box::pin(async move {
            if let Ok(conn) = conn
                .await
                .map_err(|e| warn!("Could not setup dtls for {remote_address} - {e}"))
            {
                handle_conn(
                    Arc::new(conn),
                    remote_address,
                    credit_rx,
                    desire_tx,
                    chat_rx,
                )
                .await
                .unwrap_or_else(move |e| warn!("{remote_address} conn handling error {e}"))
            }
        }),
    )
}

async fn handle_conn(
    conn: Arc<impl Conn + ?Sized>,
    address: SocketAddr,
    credit_rx: Receiver<()>,
    desire_tx: Sender<Desire>,
    chat_rx: Receiver<Arc<(Presence, VenueId, Vec<u8>)>>,
) -> anyhow::Result<()> {
    let mut packetbuf = [0u8; 2048];
    let mut venues = BTreeSet::new();
    let presence = Presence::now();
    conn.send(&Message::Icu(presence.clone(), address).to_bytes())
        .await?;

    let conn_ch = conn.clone();
    let mut changes_fut = Box::pin(async move {
        while let Ok(chat) = chat_rx.recv().await {
            let (presence, venue_id, chat) = chat.as_ref();
            conn_ch
                .send(&Message::Cht(presence.clone(), *venue_id, chat.to_vec()).to_bytes())
                .await?;
        }
        anyhow::Ok(())
    })
    .fuse();

    let conn_bye = conn.clone();
    let desire_bye = desire_tx.clone();
    let mut desires_fut = Box::pin(async move {
        loop {
            match credit_rx.recv().timeout(Duration::from_secs(1)).await {
                Some(Ok(())) => { /* OK */ }
                None => {
                    // timeout, do nothing
                    conn.send(&Message::Rlx("busy here".into()).to_bytes())
                        .await?;
                    continue;
                }
                Some(Err(_)) => {
                    warn!("Credit broke for {address}");
                    // drop it now
                    break;
                }
            };
            let size = conn.recv(&mut packetbuf).await?;

            let message = &packetbuf[0..size];
            let (typ, argument) = if size >= 4 {
                message.split_at(4)
            } else {
                (message, b"".as_slice())
            };
            match typ {
                b"quit" => {
                    desire_tx.send(Desire::Quit).await?;
                    break;
                }
                b"gone" => match argument.len() {
                    0 => {
                        for v in std::mem::take(&mut venues) {
                            desire_tx.send(Desire::Leave(v)).await?;
                            conn.send(&Message::Gon(v).to_bytes()).await?;
                        }
                    }
                    32 => {
                        let venue_id: [u8; 32] = argument.try_into().expect("32 venue_id bytes");
                        let venue_id = venue_id.into();
                        desire_tx.send(Desire::Leave(venue_id)).await?;
                        conn.send(&Message::Gon(venue_id).to_bytes()).await?;
                        venues.remove(&venue_id);
                    }
                    _ => {
                        conn.send(
                            &Message::Err(
                                "invalid gone parameter, expecting 32 venue_id or none".into(),
                            )
                            .to_bytes(),
                        )
                        .await?;
                    }
                },
                b"chat" => match parse_chat_message_2(argument) {
                    Some((presence, venue_id, chat)) => {
                        let presence_response = presence.derive_now();
                        desire_tx
                            .send(Desire::Chat(presence, venue_id.clone(), chat.to_vec()))
                            .await?;
                        if venues.insert(venue_id) {
                            conn.send(
                                &Message::Cht(presence_response, venue_id, vec![]).to_bytes(),
                            )
                            .await?;
                        }
                    }
                    None => {
                        debug!("Invalid chat from {address}: {message:?}");
                        conn.send(&Message::Err("invalid chat command".into()).to_bytes())
                            .await?;
                    }
                },
                _ => {
                    debug!("Invalid cmd from {address}: {message:?}");
                    conn.send(
                        &Message::Err("unknown message type, I know 'chat', 'gone', 'quit'".into())
                            .to_bytes(),
                    )
                    .await?;
                }
            }
        }
        anyhow::Ok(())
    })
    .fuse();

    select! {
        res = desires_fut => res.context("desires")?,
        res = changes_fut => res.context("changes")?
    }
    conn_bye.send(&Message::Bye(presence).to_bytes()).await?;
    if !desire_bye.is_closed() {
        desire_bye.send(Desire::Quit).await?;
    }
    debug!("Finished {address}");

    Ok(())
}

struct RecvUdpConn {
    udp_rx: Receiver<Vec<u8>>,
    udp_sock: Arc<UdpSocket>,
    remote_address: SocketAddr,
    local_address: SocketAddr,
}
#[async_trait::async_trait]
impl Conn for RecvUdpConn {
    async fn connect(&self, _addr: SocketAddr) -> Result<(), webrtc_util::Error> {
        todo!()
    }
    async fn recv(&self, buf: &mut [u8]) -> Result<usize, webrtc_util::Error> {
        let b = self
            .udp_rx
            .recv()
            .await
            .map_err(|_| webrtc_util::Error::ErrBufferClosed)?;
        if b.len() > buf.len() {
            return Err(webrtc_util::Error::ErrBufferShort);
        }
        buf[0..b.len()].copy_from_slice(b.as_slice());
        Ok(b.len())
    }
    async fn recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), webrtc_util::Error> {
        todo!()
    }
    async fn send(&self, buf: &[u8]) -> Result<usize, webrtc_util::Error> {
        debug!("UDP packet for {}: {}", self.remote_address, buf.len());
        self.udp_sock
            .send_to(buf, self.remote_address)
            .await
            .map_err(|e| webrtc_util::Error::Io(e.into()))
    }
    async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, webrtc_util::Error> {
        todo!()
    }
    fn local_addr(&self) -> Result<SocketAddr, webrtc_util::Error> {
        Ok(self.local_address)
    }
    fn remote_addr(&self) -> Option<SocketAddr> {
        Some(self.remote_address)
    }
    async fn close(&self) -> Result<(), webrtc_util::Error> {
        Ok(())
    }
}