use async_std::{channel::bounded, stream::StreamExt};
use futures::{join, select, stream::FuturesOrdered, FutureExt};
use log::{info, warn};
use presence::Presence;
use sha2::{Digest, Sha256};
use smol_timeout::TimeoutExt;
use std::{sync::Arc, time::Duration};
use venues::{serialize_chat_message_2, Message, VenueId};
use webrtc_dtls::{config::Config, conn::DTLSConn};
use webrtc_util::Conn;
#[async_std::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();
eprint!("Peer: ");
let mut to = String::new();
async_std::io::stdin().read_line(&mut to).await?;
let addr = to.trim_end();
let (addr, server_name) = match addr.rsplit_once(":") {
Some((name, _port)) => (addr.to_owned(), name.into()),
None => (format!("{addr}:443"), addr.into()),
};
let addrs = async_std::net::ToSocketAddrs::to_socket_addrs(&addr)
.await?
.collect::<Vec<_>>();
let local_v4_addrs = async_std::net::ToSocketAddrs::to_socket_addrs("0.0.0.0:0")
.await?
.collect::<Vec<_>>();
let local_v6_addrs = async_std::net::ToSocketAddrs::to_socket_addrs("[::]:0")
.await?
.collect::<Vec<_>>();
info!("addr {addr} resolved to {addrs:?}");
info!("we've got v4 {local_v4_addrs:?} and {local_v6_addrs:?}");
let addrs_pairs = addrs
.iter()
.map(|addr| {
if addr.is_ipv6() {
(addr, &local_v6_addrs[..])
} else {
(addr, &local_v4_addrs[..])
}
})
.map(|(addr, set)| set.iter().map(move |to| (addr, to)))
.flatten()
.collect::<Vec<_>>();
let mut config = Config {
server_name,
..Default::default()
};
let cert = std::env::var("CA_CERT").unwrap_or_default();
if cert == "insecure" {
warn!("Skipping CA cert verification");
config.insecure_skip_verify = true;
} else if cert.is_empty() {
info!("No specific CA cert used, will default to system store");
} else {
info!("Using CA Cert {cert:?}");
let cert = rustls_pemfile::certs(&mut cert.as_bytes())
.next()
.expect("cert")
.expect("cert ok");
let (added, _ignored) = config.roots_cas.add_parsable_certificates(&[cert]);
assert_eq!(added, 1);
}
let conn = addrs_pairs
.into_iter()
.map(move |(remote, local)| {
let config = config.clone();
async move {
let socket = tokio::net::UdpSocket::bind(local).await?;
socket.connect(remote).await?;
info!("Bound {} -> {}", socket.local_addr()?, remote);
let dtls_conn =
Arc::new(DTLSConn::new(Arc::new(socket), config, true, None).await?);
anyhow::Ok(dtls_conn)
}
})
.collect::<FuturesOrdered<_>>()
.filter_map(|conn| match conn {
Ok(conn) => Some(conn),
Err(e) => {
warn!("Connection to failed {e}");
None
}
})
.next()
.await
.ok_or_else(|| anyhow::anyhow!("Could not connect to {addr}"))?;
let (icu_tx, icu_rx) = bounded(1);
let conn_rcv = conn.clone();
let mut print_fut = Box::pin(async move {
info!("Connected!");
let mut buf = [0; 2042];
loop {
let len = conn_rcv.recv(&mut buf).await?;
let msg = Message::from_slice(&buf[0..len]);
match msg {
Ok(msg) => {
info!("Received {len} bytes: {msg:?}");
match msg {
Message::Bye(_) => {
break;
}
Message::Icu(presence, _) => {
icu_tx.send(presence).await?;
}
Message::Cht(_presence, _venue_id, chat) => {
match std::str::from_utf8(&chat) {
Ok(chat) => println!("{chat}"),
Err(_) => println!("{chat:?}"),
}
}
_ => {}
}
}
Err(_) => warn!("Received {len} bytes: {:?}", &buf[0..len]),
}
}
anyhow::Ok(())
})
.fuse();
let mut send_fut = Box::pin(async move {
let mut venue = VenueId::default();
let mut buffer = String::new();
let mut presence = icu_rx
.recv()
.timeout(Duration::from_secs(5))
.await
.transpose()?
.unwrap_or_else(Presence::now);
loop {
let buffer_last = std::mem::take(&mut buffer);
let mut input = String::new();
async_std::io::stdin().read_line(&mut input).await?;
match input.as_bytes() {
[b'/', b'\n'] => {
conn.send(b"gone").await?;
venue = VenueId::default()
}
[b'/', ..] => {
todo!("other commands can be implemented...")
}
[b'!', b'\n'] => {
conn.send(
&b"gone"
.into_iter()
.chain(venue.as_ref())
.cloned()
.collect::<Vec<_>>(),
)
.await?;
venue = VenueId::default()
}
[b'!', rest @ .., b'\n'] => {
let v: [u8; 32] = Sha256::digest(rest).into();
let venue_to_leave: VenueId = v.into();
conn.send(
&b"gone"
.into_iter()
.chain(venue_to_leave.as_ref())
.cloned()
.collect::<Vec<_>>(),
)
.await?;
if venue == venue_to_leave {
venue = VenueId::default()
}
}
[b'@', rest @ .., b'\n'] => {
let v: [u8; 32] = Sha256::digest(rest).into();
venue = v.into();
presence = presence.derive_now();
let msg = "chat"
.bytes()
.chain(serialize_chat_message_2(&presence, &venue, b""))
.collect::<Vec<_>>();
conn.send(&msg).await?;
}
[b'\\', rest @ .., b'\x1b', b'\n'] | [rest @ .., b'\x1b', b'\n'] => {
let prefix = input.len() - rest.len() - 2;
let tail = &input[prefix..prefix + rest.len()];
buffer = buffer_last + tail + "\n";
}
[b'\\', rest @ .., b'\n'] | [rest @ .., b'\n'] => {
presence = presence.derive_now();
let msg = "chat"
.bytes()
.chain(serialize_chat_message_2(
&presence,
&venue,
buffer_last.as_bytes(),
))
.chain(rest.iter().cloned())
.collect::<Vec<_>>();
conn.send(&msg).await?;
}
_ => {
conn.send(
&b"quit"
.into_iter()
.chain(venue.as_ref())
.cloned()
.collect::<Vec<_>>(),
)
.await?;
break;
}
}
}
anyhow::Ok(())
})
.fuse();
select! {
res = print_fut => warn!("print finished: {res:?}"),
res = send_fut => warn!("send finished: {res:?}"),
}
join!(
send_fut.timeout(Duration::from_millis(100)),
print_fut.timeout(Duration::from_millis(100))
);
Ok(())
}