use std::io::Result;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use crate::transport::{PacketReceiver, PacketSender, Transport};
const MTU: usize = 1472;
#[derive(Clone)]
pub struct UdpSocket {
socket: Arc<Mutex<std::net::UdpSocket>>,
buffer: [u8; MTU],
}
impl UdpSocket {
pub fn new(server_addr: &SocketAddr) -> Result<Self> {
let udp_socket = std::net::UdpSocket::bind(*server_addr)?;
let socket = Arc::new(Mutex::new(udp_socket));
socket.as_ref().lock().unwrap().set_nonblocking(true)?;
Ok(Self {
socket,
buffer: [0; MTU],
})
}
}
impl Transport for UdpSocket {
fn local_addr(&self) -> SocketAddr {
self.socket
.as_ref()
.lock()
.unwrap()
.local_addr()
.expect("error getting local addr")
}
fn listen(&mut self) -> (Box<dyn PacketSender>, Box<dyn PacketReceiver>) {
(Box::new(self.clone()), Box::new(self.clone()))
}
}
impl PacketSender for UdpSocket {
fn send(&mut self, payload: &[u8], address: &SocketAddr) -> Result<()> {
self.socket
.as_ref()
.lock()
.unwrap()
.send_to(payload, address)
.map(|_| ())
}
}
impl PacketReceiver for UdpSocket {
fn recv(&mut self) -> Result<Option<(&mut [u8], SocketAddr)>> {
match self
.socket
.as_ref()
.lock()
.unwrap()
.recv_from(&mut self.buffer)
{
Ok((recv_len, address)) => Ok(Some((&mut self.buffer[..recv_len], address))),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
Ok(None)
}
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use crate::transport::conditioner::{ConditionedPacketReceiver, LinkConditionerConfig};
use crate::transport::udp::UdpSocket;
use crate::transport::{PacketReceiver, PacketSender, Transport};
#[test]
fn test_udp_socket() -> Result<(), anyhow::Error> {
let local_addr = SocketAddr::from_str("127.0.0.1:0")?;
let mut server_socket = UdpSocket::new(&local_addr)?;
let mut client_socket = UdpSocket::new(&local_addr)?;
let server_addr = server_socket.local_addr();
let client_addr = client_socket.local_addr();
let msg = b"hello world";
client_socket.send(msg, &server_addr)?;
std::thread::sleep(Duration::from_millis(10));
let Some((recv_msg, address)) = server_socket.recv()? else {
panic!("expected to receive a packet");
};
assert_eq!(address, client_addr);
assert_eq!(recv_msg, msg);
Ok(())
}
#[test]
fn test_udp_socket_with_conditioner() -> Result<(), anyhow::Error> {
use mock_instant::MockClock;
let local_addr = SocketAddr::from_str("127.0.0.1:0")?;
let server_socket = UdpSocket::new(&local_addr)?;
let mut client_socket = UdpSocket::new(&local_addr)?;
let server_addr = server_socket.local_addr();
let client_addr = client_socket.local_addr();
let mut conditioned_server_receiver = ConditionedPacketReceiver::new(
server_socket,
&LinkConditionerConfig {
incoming_latency: Duration::from_millis(100),
incoming_jitter: Duration::from_millis(0),
incoming_loss: 0.0,
},
);
let msg = b"hello world";
client_socket.send(msg, &server_addr)?;
std::thread::sleep(Duration::from_millis(10));
let None = conditioned_server_receiver.recv()? else {
panic!("no packets should have arrived yet");
};
MockClock::advance(Duration::from_millis(50));
let None = conditioned_server_receiver.recv()? else {
panic!("no packets should have arrived yet");
};
MockClock::advance(Duration::from_secs(1));
let Some((recv_msg, address)) = conditioned_server_receiver.recv()? else {
panic!("expected to receive a packet");
};
assert_eq!(address, client_addr);
assert_eq!(recv_msg, msg);
Ok(())
}
}