use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use rsip::SipMessage;
use tokio::net::UdpSocket;
use tracing::{debug, trace};
use super::transaction::Reliability;
const MAX_DATAGRAM: usize = 65_535;
pub(crate) fn parse(bytes: &[u8]) -> Option<SipMessage> {
SipMessage::try_from(bytes).ok()
}
pub(crate) fn serialize(msg: &SipMessage) -> Vec<u8> {
msg.clone().into()
}
pub(crate) struct UdpTransport {
socket: Arc<UdpSocket>,
}
impl UdpTransport {
pub(crate) async fn bind(local: SocketAddr) -> io::Result<Self> {
let socket = UdpSocket::bind(local).await?;
Ok(Self {
socket: Arc::new(socket),
})
}
pub(crate) fn local_addr(&self) -> io::Result<SocketAddr> {
self.socket.local_addr()
}
pub(crate) fn reliability(&self) -> Reliability {
Reliability::Unreliable
}
pub(crate) async fn send_to(&self, msg: &SipMessage, dst: SocketAddr) -> io::Result<()> {
let bytes = serialize(msg);
debug!(
%dst,
bytes = bytes.len(),
"\n>>> SEND to {dst} >>>\n{}",
String::from_utf8_lossy(&bytes).trim_end(),
);
self.socket.send_to(&bytes, dst).await?;
Ok(())
}
pub(crate) async fn recv(&self) -> io::Result<(SipMessage, SocketAddr)> {
let mut buf = vec![0u8; MAX_DATAGRAM];
loop {
let (n, src) = self.socket.recv_from(&mut buf).await?;
debug!(
%src,
bytes = n,
"\n<<< RECV from {src} <<<\n{}",
String::from_utf8_lossy(&buf[..n]).trim_end(),
);
match parse(&buf[..n]) {
Some(msg) => return Ok((msg, src)),
None => {
trace!(%src, bytes = n, "dropping unparseable datagram");
debug!(%src, bytes = n, "^ datagram above was unparseable; dropped");
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rsip::{Request, Response};
fn options() -> Request {
let raw = "OPTIONS sip:bob@example.com SIP/2.0\r\n\
Via: SIP/2.0/UDP 10.0.0.1:5060;branch=z9hG4bK-opt\r\n\
From: <sip:alice@example.com>;tag=alice\r\n\
To: <sip:bob@example.com>\r\n\
Call-ID: call-opt\r\n\
CSeq: 4 OPTIONS\r\n\
Content-Length: 0\r\n\r\n";
Request::try_from(raw.as_bytes()).unwrap()
}
#[test]
fn parse_round_trips_serialize() {
let msg: SipMessage = options().into();
let bytes = serialize(&msg);
let back = parse(&bytes).expect("parses");
assert_eq!(back, msg);
}
#[test]
fn parse_rejects_garbage() {
assert!(parse(b"not a sip message\r\n\r\n").is_none());
}
#[tokio::test]
async fn udp_round_trip_between_two_sockets() {
let a = UdpTransport::bind("127.0.0.1:0".parse().unwrap())
.await
.unwrap();
let b = UdpTransport::bind("127.0.0.1:0".parse().unwrap())
.await
.unwrap();
let b_addr = b.local_addr().unwrap();
let sent: SipMessage = options().into();
a.send_to(&sent, b_addr).await.unwrap();
let (got, src) = b.recv().await.unwrap();
assert_eq!(got, sent);
assert_eq!(src, a.local_addr().unwrap());
}
#[tokio::test]
async fn recv_skips_malformed_then_delivers() {
let a = UdpTransport::bind("127.0.0.1:0".parse().unwrap())
.await
.unwrap();
let b = UdpTransport::bind("127.0.0.1:0".parse().unwrap())
.await
.unwrap();
let b_addr = b.local_addr().unwrap();
a.socket.send_to(b"garbage", b_addr).await.unwrap();
let good: SipMessage = options().into();
a.send_to(&good, b_addr).await.unwrap();
let (got, _) = b.recv().await.unwrap();
assert_eq!(got, good);
}
#[test]
fn response_serializes_and_parses() {
let raw = "SIP/2.0 200 OK\r\n\
Via: SIP/2.0/UDP 10.0.0.1:5060;branch=z9hG4bK-opt\r\n\
From: <sip:alice@example.com>;tag=alice\r\n\
To: <sip:bob@example.com>;tag=bob\r\n\
Call-ID: call-opt\r\n\
CSeq: 4 OPTIONS\r\n\
Content-Length: 0\r\n\r\n";
let resp = Response::try_from(raw.as_bytes()).unwrap();
let msg: SipMessage = resp.into();
assert_eq!(parse(&serialize(&msg)).unwrap(), msg);
}
}