use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use super::message::RtspRequest;
use crate::bus::PlaybackRegistry;
use crate::protocol::rtp::RtpPacketizer;
use crate::{CodecId, MediaFrame, Result, StreamKey};
pub struct RtspServer {
playback: Arc<dyn PlaybackRegistry>,
bind: SocketAddr,
}
impl RtspServer {
pub fn new(playback: Arc<dyn PlaybackRegistry>, bind: SocketAddr) -> Self {
Self { playback, bind }
}
pub async fn run(self, shutdown: CancellationToken) -> Result<()> {
let listener = TcpListener::bind(self.bind).await?;
info!(bind = %self.bind, "rtsp egress server listening");
loop {
tokio::select! {
_ = shutdown.cancelled() => break,
accepted = listener.accept() => {
let (sock, peer) = match accepted {
Ok(v) => v,
Err(e) => { warn!(error = %e, "rtsp accept failed"); continue; }
};
let playback = Arc::clone(&self.playback);
let shutdown = shutdown.clone();
tokio::spawn(async move {
if let Err(e) = serve_connection(sock, playback, shutdown).await {
debug!(%peer, error = %e, "rtsp egress connection ended");
}
});
}
}
}
Ok(())
}
}
const PT_H264: u8 = 96;
const RTP_CHANNEL: u8 = 0;
async fn serve_connection(
mut sock: TcpStream,
playback: Arc<dyn PlaybackRegistry>,
shutdown: CancellationToken,
) -> Result<()> {
let mut buf = Vec::with_capacity(2048);
loop {
let Some(req) = read_request(&mut sock, &mut buf).await? else {
return Ok(()); };
match req.method.as_str() {
"OPTIONS" => {
sock.write_all(options_response(req.cseq).as_bytes())
.await?
}
"DESCRIBE" => {
let sdp = build_sdp();
sock.write_all(describe_response(req.cseq, &req.uri, &sdp).as_bytes())
.await?;
}
"SETUP" => {
sock.write_all(setup_response(req.cseq).as_bytes()).await?;
}
"PLAY" => {
sock.write_all(play_response(req.cseq).as_bytes()).await?;
let key = stream_key_from_uri(&req.uri)
.ok_or_else(|| crate::StreamError::protocol("rtsp PLAY: bad stream uri"))?;
return play(sock, &playback, key, shutdown).await;
}
"TEARDOWN" => {
sock.write_all(simple_ok(req.cseq).as_bytes()).await?;
return Ok(());
}
other => {
debug!(method = other, "rtsp: unsupported method");
sock.write_all(not_implemented(req.cseq).as_bytes()).await?;
}
}
}
}
async fn play(
mut sock: TcpStream,
playback: &Arc<dyn PlaybackRegistry>,
key: StreamKey,
shutdown: CancellationToken,
) -> Result<()> {
let handle = playback.get_stream(&key)?;
let packetizer = RtpPacketizer::new(PT_H264, 0x5254_5350, 1400);
let mut sink = RtspSink {
sock: &mut sock,
packetizer,
pkts: Vec::new(),
wbuf: Vec::with_capacity(1500),
};
handle.drive_to(&shutdown, &mut sink).await
}
struct RtspSink<'a> {
sock: &'a mut TcpStream,
packetizer: RtpPacketizer,
pkts: Vec<Vec<u8>>,
wbuf: Vec<u8>,
}
#[async_trait::async_trait]
impl crate::bus::FrameSink for RtspSink<'_> {
async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<ControlFlow<()>> {
match send_frame(
self.sock,
&mut self.packetizer,
&mut self.pkts,
&mut self.wbuf,
&frame,
)
.await
{
Ok(()) => Ok(ControlFlow::Continue(())),
Err(_) => Ok(ControlFlow::Break(())),
}
}
}
async fn send_frame(
sock: &mut TcpStream,
packetizer: &mut RtpPacketizer,
pkts: &mut Vec<Vec<u8>>,
wbuf: &mut Vec<u8>,
frame: &MediaFrame,
) -> std::io::Result<()> {
if !frame.is_video() || frame.codec != CodecId::H264 {
return Ok(()); }
let timestamp = (frame.pts.max(0) as u64).wrapping_mul(90) as u32; packetizer.packetize_into(&frame.data, timestamp, pkts);
wbuf.clear();
for pkt in pkts.iter() {
frame_interleaved(RTP_CHANNEL, pkt, wbuf);
}
sock.write_all(wbuf).await
}
fn frame_interleaved(channel: u8, rtp: &[u8], out: &mut Vec<u8>) {
out.push(b'$');
out.push(channel);
out.extend_from_slice(&(rtp.len() as u16).to_be_bytes());
out.extend_from_slice(rtp);
}
async fn read_request(sock: &mut TcpStream, buf: &mut Vec<u8>) -> Result<Option<RtspRequest>> {
let mut tmp = [0u8; 1024];
loop {
if let Some(end) = find_double_crlf(buf) {
let head = String::from_utf8_lossy(&buf[..end]).into_owned();
buf.drain(..end);
return Ok(RtspRequest::parse(&head).map(Some).unwrap_or(None));
}
let n = sock.read(&mut tmp).await?;
if n == 0 {
return Ok(None);
}
buf.extend_from_slice(&tmp[..n]);
if buf.len() > 64 * 1024 {
return Err(crate::StreamError::protocol("rtsp request too large"));
}
}
}
fn find_double_crlf(buf: &[u8]) -> Option<usize> {
buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4)
}
fn stream_key_from_uri(uri: &str) -> Option<StreamKey> {
let rest = uri.strip_prefix("rtsp://")?;
let path = rest.split_once('/').map(|(_, p)| p)?;
let path = path.split(['?', ';']).next().unwrap_or(path);
let mut segs = path.split('/').filter(|s| !s.is_empty());
let app = segs.next()?;
let stream = segs.next()?;
Some(StreamKey::new(app, stream))
}
fn options_response(cseq: u32) -> String {
format!(
"RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
Public: OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN\r\n\r\n"
)
}
fn build_sdp() -> String {
"v=0\r\n\
o=- 0 0 IN IP4 0.0.0.0\r\n\
s=arcly-stream\r\n\
t=0 0\r\n\
m=video 0 RTP/AVP 96\r\n\
a=rtpmap:96 H264/90000\r\n\
a=control:streamid=0\r\n"
.to_string()
}
fn describe_response(cseq: u32, uri: &str, sdp: &str) -> String {
format!(
"RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
Content-Base: {uri}\r\nContent-Type: application/sdp\r\n\
Content-Length: {}\r\n\r\n{sdp}",
sdp.len()
)
}
fn setup_response(cseq: u32) -> String {
format!(
"RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
Transport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\
Session: 12345678\r\n\r\n"
)
}
fn play_response(cseq: u32) -> String {
format!("RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\nSession: 12345678\r\nRange: npt=0.000-\r\n\r\n")
}
fn simple_ok(cseq: u32) -> String {
format!("RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\nSession: 12345678\r\n\r\n")
}
fn not_implemented(cseq: u32) -> String {
format!("RTSP/1.0 501 Not Implemented\r\nCSeq: {cseq}\r\n\r\n")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::rtsp::message::{InterleavedFrame, RtspResponse};
#[test]
fn interleaved_frame_round_trips() {
let rtp = [0x80u8, 0xE0, 0x00, 0x01, 0xAA, 0xBB];
let mut framed = Vec::new();
frame_interleaved(0, &rtp, &mut framed);
assert_eq!(framed[0], b'$');
let (f, used) = InterleavedFrame::parse(&framed).expect("parse");
assert_eq!(used, framed.len());
assert_eq!(f.channel, 0);
assert_eq!(f.payload, &rtp);
}
#[test]
fn stream_key_parsed_from_uri_variants() {
let k = stream_key_from_uri("rtsp://host:554/live/cam").unwrap();
assert_eq!((k.app.as_str(), k.stream_id.as_str()), ("live", "cam"));
let k = stream_key_from_uri("rtsp://h/live/cam/streamid=0?x=1").unwrap();
assert_eq!((k.app.as_str(), k.stream_id.as_str()), ("live", "cam"));
assert!(stream_key_from_uri("rtsp://host/onlyapp").is_none());
}
#[test]
fn responses_are_well_formed() {
assert!(options_response(2).contains("Public: OPTIONS"));
let d = describe_response(3, "rtsp://h/live/cam", &build_sdp());
let parsed = RtspResponse::parse(
d.split("\r\n\r\n").next().unwrap(),
d.split("\r\n\r\n").nth(1).unwrap_or("").to_string(),
)
.expect("response parses");
assert_eq!(parsed.header("Content-Type"), Some("application/sdp"));
assert!(setup_response(4).contains("interleaved=0-1"));
}
}