use anyhow::Context;
use async_std::{
channel::{bounded, unbounded, Receiver, Sender},
net::UdpSocket,
};
use futures::{select, Future, FutureExt};
use log::*;
use presence::Presence;
use smol_timeout::TimeoutExt;
use std::{
collections::BTreeSet,
net::SocketAddr,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use venues::{parse_chat_message_2, Message, VenueId};
use webrtc_dtls::{config::Config, conn::DTLSConn};
use webrtc_util::Conn;
#[derive(Debug)]
pub struct Participant {
seen: Instant,
address: SocketAddr,
venues: BTreeSet<VenueId>,
desire_rx: Receiver<Desire>,
credit_tx: Sender<()>,
chat_tx: Sender<Arc<(Presence, VenueId, Vec<u8>)>>,
udp_tx: Sender<Vec<u8>>,
}
impl Participant {
async fn grant_credit(&self) {
self.credit_tx
.send(())
.timeout(Duration::from_millis(5))
.await
.unwrap_or_else(|| {
Ok(warn!(
"Send credit for {address} timed out",
address = self.address
))
})
.unwrap_or_else(|e| {
warn!(
"Could not send credit for {address} - {e}",
address = self.address
)
});
}
pub async fn push_packet(&mut self, buf: &[u8]) {
self.seen = Instant::now();
self.grant_credit().await;
self.udp_tx
.send(buf.to_vec())
.timeout(Duration::from_millis(5))
.await
.unwrap_or_else(|| {
Ok(warn!(
"Digesting packet for {address} timed out",
address = self.address
))
})
.unwrap_or_else(|e| {
warn!(
"Could not digest packet for {address} - {e}",
address = self.address
)
});
}
pub async fn pop_message(&mut self) -> Option<(SocketAddr, Presence, VenueId, Vec<u8>)> {
match self.desire_rx.recv().await.ok() {
None => {
warn!("Desire broke for {}", self.address);
self.deactivate();
None
}
Some(Desire::Quit) => {
self.venues.clear();
self.deactivate();
None
}
Some(Desire::Leave(venue_id)) => {
self.venues.remove(&venue_id);
None
}
Some(Desire::Chat(presence, venue_id, chat)) => {
self.venues.insert(venue_id);
Some((self.address, presence, venue_id, chat))
}
}
}
pub fn deactivate(&mut self) {
self.seen = Instant::now() - Duration::MAX;
}
pub fn address(&self) -> SocketAddr {
self.address
}
pub fn expired(&self, max_idle: u64) -> bool {
self.seen.elapsed().as_secs() >= max_idle
}
pub async fn consider(&self, address: SocketAddr, chat: Arc<(Presence, VenueId, Vec<u8>)>) {
if self.address != address && self.venues.contains(&chat.1) {
let _ = self.chat_tx.send(chat.clone()).await;
self.grant_credit().await;
}
}
}
#[derive(Debug, Clone)]
pub enum Desire {
Chat(Presence, VenueId, Vec<u8>),
Leave(VenueId),
Quit,
}
pub fn setup(
config: Config,
local_address: SocketAddr,
remote_address: SocketAddr,
udp_sock: Arc<UdpSocket>,
) -> (Participant, Pin<Box<dyn Future<Output = ()>>>) {
let (udp_tx, udp_rx) = unbounded();
let (chat_tx, chat_rx) = unbounded();
let (credit_tx, credit_rx) = bounded(100);
let (desire_tx, desire_rx) = unbounded();
let conn = RecvUdpConn {
udp_rx,
udp_sock,
remote_address,
local_address,
};
let conn = DTLSConn::new(Arc::new(conn), config, false, None);
(
Participant {
seen: Instant::now(),
address: remote_address,
desire_rx,
credit_tx,
chat_tx,
udp_tx,
venues: BTreeSet::new(),
},
Box::pin(async move {
if let Ok(conn) = conn
.await
.map_err(|e| warn!("Could not setup dtls for {remote_address} - {e}"))
{
handle_conn(
Arc::new(conn),
remote_address,
credit_rx,
desire_tx,
chat_rx,
)
.await
.unwrap_or_else(move |e| warn!("{remote_address} conn handling error {e}"))
}
}),
)
}
async fn handle_conn(
conn: Arc<impl Conn + ?Sized>,
address: SocketAddr,
credit_rx: Receiver<()>,
desire_tx: Sender<Desire>,
chat_rx: Receiver<Arc<(Presence, VenueId, Vec<u8>)>>,
) -> anyhow::Result<()> {
let mut packetbuf = [0u8; 2048];
let mut venues = BTreeSet::new();
let presence = Presence::now();
conn.send(&Message::Icu(presence.clone(), address).to_bytes())
.await?;
let conn_ch = conn.clone();
let mut changes_fut = Box::pin(async move {
while let Ok(chat) = chat_rx.recv().await {
let (presence, venue_id, chat) = chat.as_ref();
conn_ch
.send(&Message::Cht(presence.clone(), *venue_id, chat.to_vec()).to_bytes())
.await?;
}
anyhow::Ok(())
})
.fuse();
let conn_bye = conn.clone();
let desire_bye = desire_tx.clone();
let mut desires_fut = Box::pin(async move {
loop {
match credit_rx.recv().timeout(Duration::from_secs(1)).await {
Some(Ok(())) => { }
None => {
conn.send(&Message::Rlx("busy here".into()).to_bytes())
.await?;
continue;
}
Some(Err(_)) => {
warn!("Credit broke for {address}");
break;
}
};
let size = conn.recv(&mut packetbuf).await?;
let message = &packetbuf[0..size];
let (typ, argument) = if size >= 4 {
message.split_at(4)
} else {
(message, b"".as_slice())
};
match typ {
b"quit" => {
desire_tx.send(Desire::Quit).await?;
break;
}
b"gone" => match argument.len() {
0 => {
for v in std::mem::take(&mut venues) {
desire_tx.send(Desire::Leave(v)).await?;
conn.send(&Message::Gon(v).to_bytes()).await?;
}
}
32 => {
let venue_id: [u8; 32] = argument.try_into().expect("32 venue_id bytes");
let venue_id = venue_id.into();
desire_tx.send(Desire::Leave(venue_id)).await?;
conn.send(&Message::Gon(venue_id).to_bytes()).await?;
venues.remove(&venue_id);
}
_ => {
conn.send(
&Message::Err(
"invalid gone parameter, expecting 32 venue_id or none".into(),
)
.to_bytes(),
)
.await?;
}
},
b"chat" => match parse_chat_message_2(argument) {
Some((presence, venue_id, chat)) => {
let presence_response = presence.derive_now();
desire_tx
.send(Desire::Chat(presence, venue_id.clone(), chat.to_vec()))
.await?;
if venues.insert(venue_id) {
conn.send(
&Message::Cht(presence_response, venue_id, vec![]).to_bytes(),
)
.await?;
}
}
None => {
debug!("Invalid chat from {address}: {message:?}");
conn.send(&Message::Err("invalid chat command".into()).to_bytes())
.await?;
}
},
_ => {
debug!("Invalid cmd from {address}: {message:?}");
conn.send(
&Message::Err("unknown message type, I know 'chat', 'gone', 'quit'".into())
.to_bytes(),
)
.await?;
}
}
}
anyhow::Ok(())
})
.fuse();
select! {
res = desires_fut => res.context("desires")?,
res = changes_fut => res.context("changes")?
}
conn_bye.send(&Message::Bye(presence).to_bytes()).await?;
if !desire_bye.is_closed() {
desire_bye.send(Desire::Quit).await?;
}
debug!("Finished {address}");
Ok(())
}
struct RecvUdpConn {
udp_rx: Receiver<Vec<u8>>,
udp_sock: Arc<UdpSocket>,
remote_address: SocketAddr,
local_address: SocketAddr,
}
#[async_trait::async_trait]
impl Conn for RecvUdpConn {
async fn connect(&self, _addr: SocketAddr) -> Result<(), webrtc_util::Error> {
todo!()
}
async fn recv(&self, buf: &mut [u8]) -> Result<usize, webrtc_util::Error> {
let b = self
.udp_rx
.recv()
.await
.map_err(|_| webrtc_util::Error::ErrBufferClosed)?;
if b.len() > buf.len() {
return Err(webrtc_util::Error::ErrBufferShort);
}
buf[0..b.len()].copy_from_slice(b.as_slice());
Ok(b.len())
}
async fn recv_from(&self, _buf: &mut [u8]) -> Result<(usize, SocketAddr), webrtc_util::Error> {
todo!()
}
async fn send(&self, buf: &[u8]) -> Result<usize, webrtc_util::Error> {
debug!("UDP packet for {}: {}", self.remote_address, buf.len());
self.udp_sock
.send_to(buf, self.remote_address)
.await
.map_err(|e| webrtc_util::Error::Io(e.into()))
}
async fn send_to(&self, _buf: &[u8], _target: SocketAddr) -> Result<usize, webrtc_util::Error> {
todo!()
}
fn local_addr(&self) -> Result<SocketAddr, webrtc_util::Error> {
Ok(self.local_address)
}
fn remote_addr(&self) -> Option<SocketAddr> {
Some(self.remote_address)
}
async fn close(&self) -> Result<(), webrtc_util::Error> {
Ok(())
}
}