use async_std::net::UdpSocket;
use async_std::{
channel::{Receiver, RecvError, SendError, Sender},
stream::StreamExt,
};
use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream};
use log::*;
use presence::dedup::Dedup;
use presence::seen::Seen;
use presence::Presence;
use smol_timeout::TimeoutExt;
use std::sync::atomic::AtomicUsize;
use std::{
collections::{BTreeMap, BTreeSet},
io,
net::IpAddr,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use webrtc_dtls::{config::Config, crypto::Certificate};
const MAX_IDLE: u64 = 300;
use crate::reps::{STAT_BYTES, STAT_MESSAGES,STAT_DUPES};
pub async fn talk<'a>(
on: String,
works: Sender<Pin<Box<dyn Future<Output = ()>>>>,
rep_tx: Sender<(IpAddr, &'static str, usize)>,
ban_rx: Receiver<BTreeSet<IpAddr>>,
mut certs: impl Stream<Item = (Vec<String>, Certificate)> + Unpin,
) -> anyhow::Result<()> {
let dupes = Arc::new(
Seen::builder()
.max(3)
.precision(1000)
.extra_hits(2)
.build()
.into_dedup(),
);
let (reset_tx, reset_rx) = async_std::channel::unbounded();
let pending_rx = reset_rx.clone();
let mut talks: Pin<Box<dyn Future<Output = ()>>> =
Box::pin(async move { pending_rx.recv().await.unwrap_or_else(|_| {}) });
loop {
let on = on.clone();
let works = works.clone();
let rep_tx = rep_tx.clone();
let ban_rx = ban_rx.clone();
let reset_rx = reset_rx.clone();
let dupes = dupes.clone();
select! {
_ = talks.as_mut().fuse() => break,
res = certs.next().fuse() => talks = {
info!("Closing previous talks...");
let _ = reset_tx.send(()).await;
talks.await;
match res{
None => {
warn!("No more talks...");
break
},
Some((_domains, cert))=> {
info!("Alright, setting up new talks...");
Box::pin(async move {
listen(on, cert, works, rep_tx, ban_rx, reset_rx, dupes)
.await
.unwrap_or_else(|e| error!("talk processing failed - {e}"))
})
}
}
}
}
}
Ok(())
}
#[derive(thiserror::Error, Debug)]
enum TalkError {
#[error("WebRTC error {0}")]
WebRtcDtls(#[from] webrtc_dtls::Error),
#[error("WebRTC error {0}")]
WebRtcUtil(#[from] webrtc_util::Error),
#[error("Async receive error {0}")]
Rcv(#[from] RecvError),
#[error("Async send credit error {0}")]
Sync(#[from] SendError<()>),
#[error("Error processing server certificate")]
CertParsing,
#[error("Error receiving raw UDP packet - {0}")]
UdpRecv(io::Error),
#[error("Error listening on UDP socket - {0}")]
UdpBinding(io::Error),
#[error("Error sending reps")]
RepsFailed,
}
type PresenceDedup = Dedup<Presence, AtomicUsize>;
pub fn new_certs(
server_names: Vec<String>,
cert_rx: Receiver<Option<(Vec<String>, Vec<u8>)>>,
) -> impl Stream<Item = (Vec<String>, Certificate)> {
info!("Waiting for new certs for {server_names:?}...");
cert_rx
.map(move |received| match received {
Some((d, c)) => anyhow::Ok((
d,
Certificate::from_pem(
std::str::from_utf8(&c).map_err(|_| TalkError::CertParsing)?,
)?,
)),
None => anyhow::bail!("No cert received"),
})
.map(move |res| match res {
Ok(_) => todo!(),
Err(e) => {
warn!("Failed to get a cert for {server_names:?} - {e}, making a self-signed one");
(
server_names.clone(),
Certificate::generate_self_signed(server_names.clone())
.expect("self-signed cert"),
)
}
})
.inspect(move |(d, c)| debug!("My cert for {d:?}:\n{}", c.serialize_pem()))
}
async fn listen(
on: String,
cert: Certificate,
works: Sender<Pin<Box<dyn Future<Output = ()>>>>,
rep_tx: Sender<(IpAddr, &'static str, usize)>,
ban_rx: Receiver<BTreeSet<IpAddr>>,
reset_rx: Receiver<()>,
dedup: Arc<PresenceDedup>,
) -> Result<(), TalkError> {
let (on, server_name) = match on.rsplit_once(":") {
Some((name, _port)) => (on.to_owned(), name.into()),
None => (format!("{on}:443"), on),
};
let config = Config {
server_name,
certificates: vec![cert.clone()],
..Default::default()
};
let mut bans = BTreeSet::new();
let mut participants = BTreeMap::new();
let sock = Arc::new(UdpSocket::bind(&on).await.map_err(TalkError::UdpBinding)?);
let local_address = sock.local_addr().map_err(TalkError::UdpBinding)?;
info!("Listening on {local_address} for {on}");
let mut buf = [0u8; 2048];
let mut dedup_last_ease = Instant::now();
while reset_rx
.recv()
.timeout(Duration::from_millis(1))
.await
.is_none()
{
if dedup_last_ease.elapsed().as_secs() > 60 {
let instant = Instant::now();
let cnt = dedup.as_ref().into_iter().count();
let dur = (dedup_last_ease - instant).as_millis();
let _ = dedup.ease();
dedup_last_ease = Instant::now();
debug!("Dedup ease {cnt} took {dur}ms",);
}
debug!("Waiting for next UDP packet...");
let mut rcv_timeout = Duration::from_secs(10);
while let Some((size, address)) = sock
.recv_from(&mut buf)
.timeout(rcv_timeout)
.await
.transpose()
.map_err(TalkError::UdpRecv)?
{
rcv_timeout = Duration::from_millis(1);
debug!("UDP Packet from {address}: {size}");
rep_tx
.send((address.ip(), STAT_BYTES, size))
.await
.map_err(|_| TalkError::RepsFailed)?;
rep_tx
.send((address.ip(), STAT_MESSAGES, 1))
.await
.map_err(|_| TalkError::RepsFailed)?;
let is_banned = bans.contains(&address.ip());
let is_participating = participants.contains_key(&address);
if is_banned && !is_participating {
info!("Ignoring new client from banned address {address}!");
continue;
}
let mut new_conn = None;
let participant = participants.entry(address).or_insert_with(|| {
let (participant, conn) =
crate::session::setup(config.clone(), local_address, address, sock.clone());
new_conn = Some(conn);
participant
});
debug!(
"Participant on new conn - {}: {participant:?}",
new_conn.is_some()
);
participant.push_packet(&buf[0..size]).await;
if let Some(fut) = new_conn {
if works.send(fut).await.is_err() {
error!("works disconnect");
break;
}
}
}
if let Some(Ok(new_bans)) = ban_rx.recv().timeout(Duration::from_millis(1)).await {
if new_bans != bans {
debug!("new bans: {new_bans:?}");
}
bans = new_bans;
}
participants.retain(|address, participant| {
if bans.contains(&address.ip()) {
warn!("banning {address}");
false
} else if participant.expired(MAX_IDLE) {
warn!("idle {address}");
false
} else {
true
}
});
debug!("We have {} participants", participants.len());
let msg = participants
.values_mut()
.map(|p| p.pop_message())
.collect::<FuturesUnordered<_>>()
.filter_map(|msg| msg)
.next()
.timeout(Duration::from_millis(10))
.await
.flatten();
debug!("Message {msg:?}");
if let Some((address, presence, venue_id, chat)) = msg {
match dedup.check(2, presence.clone()) {
Ok(true) => {}
Ok(false) => {
debug!("Ignoring duplicate {presence}");
let _ = rep_tx.send((address.ip(), STAT_DUPES, 1)).await;
continue;
}
Err(_) => {
warn!("dupcheck broken")
}
}
info!("Chat {} from {address} for {venue_id}", chat.len());
if !chat.is_empty() {
let chat = Arc::new((presence, venue_id, chat));
for p in participants.values_mut() {
p.consider(address, chat.clone()).await;
}
}
}
}
Ok(())
}