const MTU_MAX_PACKET_SIZE: usize = 65535;
use super::{Result, UdpError};
use crate::packet::{VoipHeader, VoipPacket};
use parking_lot::Mutex;
use std::{net::SocketAddr, sync::Arc};
use tokio::{
net::UdpSocket,
select,
sync::mpsc::{channel, Receiver, Sender},
};
use tokio_util::sync::CancellationToken;
use tracing::{event, Level};
#[derive(Debug)]
pub struct Server {
connected_clients: ClientList,
cancellation_token: CancellationToken,
inbound_message_receiver: Receiver<(VoipHeader, Vec<u8>, SocketAddr)>,
outbound_message_receiver: Sender<VoipPacket>,
}
#[derive(Debug, Default, Clone)]
pub struct ClientList(Arc<Mutex<Vec<SocketAddr>>>);
impl ClientList {
pub fn remove(&self, key: &SocketAddr) -> Option<SocketAddr> {
let mut list = self.0.lock();
list.iter()
.position(|socket_addr| *socket_addr == *key)
.map(|pos| list.swap_remove(pos))
}
}
impl Server {
pub async fn new(port: u32) -> Result<Self> {
let socket_handle = UdpSocket::bind(format!("[::]:{port}"))
.await
.map_err(UdpError::BindError)?;
let (outbound_message_sender, mut outbound_message_receiver) = channel::<VoipPacket>(255);
let (inbound_message_sender, inbound_message_receiver) = channel(255);
let cancellation_token = CancellationToken::new();
let client_list = ClientList::default();
let client_list_clone = client_list.clone();
let cancellation_token_clone = cancellation_token.clone();
tokio::spawn(async move {
loop {
let client_list = client_list_clone.clone();
let mut buf = vec![0; 8];
select! {
incoming_bytes = socket_handle.recv_from(&mut buf) => {
match incoming_bytes {
Ok((_byte_count, socket_addr)) => {
let body_length = usize::from_be_bytes(buf.try_into().unwrap());
if body_length > MTU_MAX_PACKET_SIZE {
event!(Level::ERROR, "Message header with too large length: {body_length}. Discarding message.");
continue;
}
let mut body_buf = vec![0; body_length];
socket_handle.recv(&mut body_buf).await.unwrap();
match rmp_serde::from_slice::<VoipHeader>(&body_buf) {
Ok(voip_header) => {
let voip_body_length = match voip_header.voip_message_type() {
crate::packet::VoipMessageType::VoiceMessage(length) => length,
crate::packet::VoipMessageType::VideoMessage(length) => length,
};
let mut voip_body_buf = Vec::with_capacity(*voip_body_length as usize);
socket_handle.recv(&mut voip_body_buf).await.unwrap();
inbound_message_sender.send((voip_header, voip_body_buf, socket_addr)).await.unwrap();
},
Err(err) => {
event!(Level::ERROR, "Failed to deserialize a VoipPacket: {err}");
},
}
},
Err(err) => {
event!(Level::ERROR, "Failed to receive message: {err}");
},
}
}
Some(outgoing_message) = outbound_message_receiver.recv() => {
let client_list_clone = client_list.0.lock().clone();
for remote_addr in client_list_clone.iter() {
socket_handle.send_to(&outgoing_message.0, remote_addr).await.unwrap();
}
}
_ = cancellation_token_clone.cancelled() => break,
}
}
});
Ok(Self {
connected_clients: client_list,
inbound_message_receiver,
cancellation_token,
outbound_message_receiver: outbound_message_sender,
})
}
pub fn message_receiver(&mut self) -> &mut Receiver<(VoipHeader, Vec<u8>, SocketAddr)> {
&mut self.inbound_message_receiver
}
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}
pub fn get_reply_to_list_mut(&self) -> Arc<Mutex<Vec<SocketAddr>>> {
self.connected_clients.0.clone()
}
pub async fn reply_to_clients(
&self,
voip_packet: VoipPacket,
) -> std::result::Result<(), tokio::sync::mpsc::error::SendError<VoipPacket>> {
self.outbound_message_receiver.send(voip_packet).await
}
}