venues 0.3.2

Privacy friendly online meeting service
Documentation
use async_std::{channel::bounded, stream::StreamExt};
use futures::{join, select, stream::FuturesOrdered, FutureExt};
use log::{info, warn};
use presence::Presence;
use sha2::{Digest, Sha256};
use smol_timeout::TimeoutExt;
use std::{sync::Arc, time::Duration};
use venues::{serialize_chat_message_2, Message, VenueId};
use webrtc_dtls::{config::Config, conn::DTLSConn};
use webrtc_util::Conn;

#[async_std::main]

async fn main() -> anyhow::Result<()> {
    env_logger::init();
    eprint!("Peer: ");
    let mut to = String::new();
    async_std::io::stdin().read_line(&mut to).await?;
    let addr = to.trim_end();

    let (addr, server_name) = match addr.rsplit_once(":") {
        Some((name, _port)) => (addr.to_owned(), name.into()),
        None => (format!("{addr}:443"), addr.into()),
    };

    let addrs = async_std::net::ToSocketAddrs::to_socket_addrs(&addr)
        .await?
        .collect::<Vec<_>>();
    let local_v4_addrs = async_std::net::ToSocketAddrs::to_socket_addrs("0.0.0.0:0")
        .await?
        .collect::<Vec<_>>();
    let local_v6_addrs = async_std::net::ToSocketAddrs::to_socket_addrs("[::]:0")
        .await?
        .collect::<Vec<_>>();
    info!("addr {addr} resolved to {addrs:?}");
    info!("we've got v4 {local_v4_addrs:?} and {local_v6_addrs:?}");
    let addrs_pairs = addrs
        .iter()
        .map(|addr| {
            if addr.is_ipv6() {
                (addr, &local_v6_addrs[..])
            } else {
                (addr, &local_v4_addrs[..])
            }
        })
        .map(|(addr, set)| set.iter().map(move |to| (addr, to)))
        .flatten()
        .collect::<Vec<_>>();

    let mut config = Config {
        server_name,
        ..Default::default()
    };

    let cert = std::env::var("CA_CERT").unwrap_or_default();
    if cert == "insecure" {
        warn!("Skipping CA cert verification");
        config.insecure_skip_verify = true;
    } else if cert.is_empty() {
        info!("No specific CA cert used, will default to system store");
    } else {
        info!("Using CA Cert {cert:?}");
        let cert = rustls_pemfile::certs(&mut cert.as_bytes())
            .next()
            .expect("cert")
            .expect("cert ok");
        let (added, _ignored) = config.roots_cas.add_parsable_certificates(&[cert]);
        assert_eq!(added, 1);
    }

    let conn = addrs_pairs
        .into_iter()
        .map(move |(remote, local)| {
            let config = config.clone();
            async move {
                let socket = tokio::net::UdpSocket::bind(local).await?;
                socket.connect(remote).await?;
                info!("Bound {} -> {}", socket.local_addr()?, remote);
                let dtls_conn =
                    Arc::new(DTLSConn::new(Arc::new(socket), config, true, None).await?);
                anyhow::Ok(dtls_conn)
            }
        })
        .collect::<FuturesOrdered<_>>()
        .filter_map(|conn| match conn {
            Ok(conn) => Some(conn),
            Err(e) => {
                warn!("Connection to failed {e}");
                None
            }
        })
        .next()
        .await
        .ok_or_else(|| anyhow::anyhow!("Could not connect to {addr}"))?;

    let (icu_tx, icu_rx) = bounded(1);
    let conn_rcv = conn.clone();
    let mut print_fut = Box::pin(async move {
        info!("Connected!");

        let mut buf = [0; 2042];
        loop {
            let len = conn_rcv.recv(&mut buf).await?;
            let msg = Message::from_slice(&buf[0..len]);
            match msg {
                Ok(msg) => {
                    info!("Received {len} bytes: {msg:?}");
                    match msg {
                        Message::Bye(_) => {
                            break;
                        }
                        Message::Icu(presence, _) => {
                            icu_tx.send(presence).await?;
                        }
                        Message::Cht(_presence, _venue_id, chat) => {
                            match std::str::from_utf8(&chat) {
                                Ok(chat) => println!("{chat}"),
                                Err(_) => println!("{chat:?}"),
                            }
                        }
                        _ => {}
                    }
                }
                Err(_) => warn!("Received {len} bytes: {:?}", &buf[0..len]),
            }
        }
        anyhow::Ok(())
    })
    .fuse();
    let mut send_fut = Box::pin(async move {
        let mut venue = VenueId::default();
        let mut buffer = String::new();
        let mut presence = icu_rx
            .recv()
            .timeout(Duration::from_secs(5))
            .await
            .transpose()?
            .unwrap_or_else(Presence::now);

        loop {
            // reset the buffer if not continued
            let buffer_last = std::mem::take(&mut buffer);
            let mut input = String::new();
            async_std::io::stdin().read_line(&mut input).await?;
            match input.as_bytes() {
                [b'/', b'\n'] => {
                    // leave all venues
                    conn.send(b"gone").await?;
                    venue = VenueId::default()
                }
                [b'/', ..] => {
                    todo!("other commands can be implemented...")
                }
                [b'!', b'\n'] => {
                    // leave current venue
                    conn.send(
                        &b"gone"
                            .into_iter()
                            .chain(venue.as_ref())
                            .cloned()
                            .collect::<Vec<_>>(),
                    )
                    .await?;
                    venue = VenueId::default()
                }
                [b'!', rest @ .., b'\n'] => {
                    // leave given venue
                    let v: [u8; 32] = Sha256::digest(rest).into();
                    let venue_to_leave: VenueId = v.into();
                    conn.send(
                        &b"gone"
                            .into_iter()
                            .chain(venue_to_leave.as_ref())
                            .cloned()
                            .collect::<Vec<_>>(),
                    )
                    .await?;
                    if venue == venue_to_leave {
                        venue = VenueId::default()
                    }
                }
                [b'@', rest @ .., b'\n'] => {
                    // join a venue
                    let v: [u8; 32] = Sha256::digest(rest).into();
                    venue = v.into();
                    presence = presence.derive_now();
                    let msg = "chat"
                        .bytes()
                        .chain(serialize_chat_message_2(&presence, &venue, b""))
                        .collect::<Vec<_>>();
                    conn.send(&msg).await?;
                }
                [b'\\', rest @ .., b'\x1b', b'\n'] | [rest @ .., b'\x1b', b'\n'] => {
                    // compose a chat, to be continued...
                    let prefix = input.len() - rest.len() - 2;
                    let tail = &input[prefix..prefix + rest.len()];
                    buffer = buffer_last + tail + "\n";
                }
                [b'\\', rest @ .., b'\n'] | [rest @ .., b'\n'] => {
                    //  chat, escape first char or not...
                    presence = presence.derive_now();
                    let msg = "chat"
                        .bytes()
                        .chain(serialize_chat_message_2(
                            &presence,
                            &venue,
                            buffer_last.as_bytes(),
                        ))
                        .chain(rest.iter().cloned())
                        .collect::<Vec<_>>();
                    conn.send(&msg).await?;
                }
                _ => {
                    // end of input, quit
                    conn.send(
                        &b"quit"
                            .into_iter()
                            .chain(venue.as_ref())
                            .cloned()
                            .collect::<Vec<_>>(),
                    )
                    .await?;
                    break;
                }
            }
        }
        anyhow::Ok(())
    })
    .fuse();

    select! {
        res = print_fut => warn!("print finished: {res:?}"),
        res = send_fut => warn!("send finished: {res:?}"),
    }

    join!(
        send_fut.timeout(Duration::from_millis(100)),
        print_fut.timeout(Duration::from_millis(100))
    );

    Ok(())
}