arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! RTSP **egress server** — serve a live stream to RTSP players (VLC, ffmpeg).
//!
//! The serving counterpart to the client-pull [ingest handler](super::RtspHandler):
//! [`RtspServer`] accepts TCP connections, runs the
//! `OPTIONS → DESCRIBE → SETUP → PLAY → TEARDOWN` state machine, and streams the
//! requested stream's H.264 access units as RTP over the TCP-interleaved
//! transport (RFC 2326 §10.12), reusing the shared
//! [`RtpPacketizer`](crate::protocol::rtp::RtpPacketizer).
//!
//! Interleaved (RTP-over-TCP) transport only — the universally-supported path
//! that needs no separate UDP ports. The stream is selected from the request
//! URI's `/app/stream` path.

use std::net::SocketAddr;
use std::ops::ControlFlow;
use std::sync::Arc;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use super::message::RtspRequest;
use crate::bus::PlaybackRegistry;
use crate::protocol::rtp::RtpPacketizer;
use crate::{CodecId, MediaFrame, Result, StreamKey};

/// An RTSP server that serves live streams to pulling players.
pub struct RtspServer {
    playback: Arc<dyn PlaybackRegistry>,
    bind: SocketAddr,
}

impl RtspServer {
    /// Serve streams from `playback`, listening on `bind`.
    pub fn new(playback: Arc<dyn PlaybackRegistry>, bind: SocketAddr) -> Self {
        Self { playback, bind }
    }

    /// Accept and serve RTSP connections until `shutdown` fires.
    pub async fn run(self, shutdown: CancellationToken) -> Result<()> {
        let listener = TcpListener::bind(self.bind).await?;
        info!(bind = %self.bind, "rtsp egress server listening");
        loop {
            tokio::select! {
                _ = shutdown.cancelled() => break,
                accepted = listener.accept() => {
                    let (sock, peer) = match accepted {
                        Ok(v) => v,
                        Err(e) => { warn!(error = %e, "rtsp accept failed"); continue; }
                    };
                    let playback = Arc::clone(&self.playback);
                    let shutdown = shutdown.clone();
                    tokio::spawn(async move {
                        if let Err(e) = serve_connection(sock, playback, shutdown).await {
                            debug!(%peer, error = %e, "rtsp egress connection ended");
                        }
                    });
                }
            }
        }
        Ok(())
    }
}

/// The RTP payload type advertised for H.264 (must match the SDP `rtpmap`).
const PT_H264: u8 = 96;
/// The interleaved channel for RTP (RTCP would be channel 1).
const RTP_CHANNEL: u8 = 0;

async fn serve_connection(
    mut sock: TcpStream,
    playback: Arc<dyn PlaybackRegistry>,
    shutdown: CancellationToken,
) -> Result<()> {
    let mut buf = Vec::with_capacity(2048);
    loop {
        let Some(req) = read_request(&mut sock, &mut buf).await? else {
            return Ok(()); // peer closed
        };
        match req.method.as_str() {
            "OPTIONS" => {
                sock.write_all(options_response(req.cseq).as_bytes())
                    .await?
            }
            "DESCRIBE" => {
                let sdp = build_sdp();
                sock.write_all(describe_response(req.cseq, &req.uri, &sdp).as_bytes())
                    .await?;
            }
            "SETUP" => {
                sock.write_all(setup_response(req.cseq).as_bytes()).await?;
            }
            "PLAY" => {
                sock.write_all(play_response(req.cseq).as_bytes()).await?;
                let key = stream_key_from_uri(&req.uri)
                    .ok_or_else(|| crate::StreamError::protocol("rtsp PLAY: bad stream uri"))?;
                return play(sock, &playback, key, shutdown).await;
            }
            "TEARDOWN" => {
                sock.write_all(simple_ok(req.cseq).as_bytes()).await?;
                return Ok(());
            }
            other => {
                debug!(method = other, "rtsp: unsupported method");
                sock.write_all(not_implemented(req.cseq).as_bytes()).await?;
            }
        }
    }
}

/// Stream `key`'s H.264 access units as interleaved RTP until the stream ends,
/// the client disconnects, or `shutdown` fires.
async fn play(
    mut sock: TcpStream,
    playback: &Arc<dyn PlaybackRegistry>,
    key: StreamKey,
    shutdown: CancellationToken,
) -> Result<()> {
    let handle = playback.get_stream(&key)?;
    let packetizer = RtpPacketizer::new(PT_H264, 0x5254_5350, 1400); // "RTSP"

    // Replay the instant-start buffer then forward live frames; a write error
    // means the player disconnected, which ends the session gracefully.
    let mut sink = RtspSink {
        sock: &mut sock,
        packetizer,
        pkts: Vec::new(),
        wbuf: Vec::with_capacity(1500),
    };
    handle.drive_to(&shutdown, &mut sink).await
}

/// Streams a stream's frames to one RTSP player as interleaved RTP-over-TCP.
struct RtspSink<'a> {
    sock: &'a mut TcpStream,
    packetizer: RtpPacketizer,
    /// Reused across frames: the per-frame RTP packet buffers.
    pkts: Vec<Vec<u8>>,
    /// Reused across frames: the interleaved-framing write buffer.
    wbuf: Vec<u8>,
}

#[async_trait::async_trait]
impl crate::bus::FrameSink for RtspSink<'_> {
    async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<ControlFlow<()>> {
        // A write error means the player disconnected — stop the drive cleanly.
        match send_frame(
            self.sock,
            &mut self.packetizer,
            &mut self.pkts,
            &mut self.wbuf,
            &frame,
        )
        .await
        {
            Ok(()) => Ok(ControlFlow::Continue(())),
            Err(_) => Ok(ControlFlow::Break(())),
        }
    }
}

async fn send_frame(
    sock: &mut TcpStream,
    packetizer: &mut RtpPacketizer,
    pkts: &mut Vec<Vec<u8>>,
    wbuf: &mut Vec<u8>,
    frame: &MediaFrame,
) -> std::io::Result<()> {
    if !frame.is_video() || frame.codec != CodecId::H264 {
        return Ok(()); // egress is H.264 video over RTSP for now
    }
    // Clamp to 0 before scaling: a negative PTS cast straight to u64 would become
    // an enormous value and make the RTP timestamp jump wildly.
    let timestamp = (frame.pts.max(0) as u64).wrapping_mul(90) as u32; // ms → 90 kHz
    packetizer.packetize_into(&frame.data, timestamp, pkts);
    // Coalesce every interleaved RTP packet of this access unit into one write
    // (and one reused buffer) rather than a syscall + allocation per packet.
    wbuf.clear();
    for pkt in pkts.iter() {
        frame_interleaved(RTP_CHANNEL, pkt, wbuf);
    }
    sock.write_all(wbuf).await
}

/// Append an RTP packet framed for the RTSP TCP-interleaved transport to `out`:
/// `$` + 1-byte channel + 2-byte big-endian length + the RTP packet.
fn frame_interleaved(channel: u8, rtp: &[u8], out: &mut Vec<u8>) {
    out.push(b'$');
    out.push(channel);
    out.extend_from_slice(&(rtp.len() as u16).to_be_bytes());
    out.extend_from_slice(rtp);
}

/// Read one RTSP request (headers terminated by a blank line) from `sock`,
/// returning `None` on a clean EOF.
async fn read_request(sock: &mut TcpStream, buf: &mut Vec<u8>) -> Result<Option<RtspRequest>> {
    let mut tmp = [0u8; 1024];
    loop {
        if let Some(end) = find_double_crlf(buf) {
            let head = String::from_utf8_lossy(&buf[..end]).into_owned();
            buf.drain(..end);
            return Ok(RtspRequest::parse(&head).map(Some).unwrap_or(None));
        }
        let n = sock.read(&mut tmp).await?;
        if n == 0 {
            return Ok(None);
        }
        buf.extend_from_slice(&tmp[..n]);
        if buf.len() > 64 * 1024 {
            return Err(crate::StreamError::protocol("rtsp request too large"));
        }
    }
}

fn find_double_crlf(buf: &[u8]) -> Option<usize> {
    buf.windows(4).position(|w| w == b"\r\n\r\n").map(|p| p + 4)
}

/// Map an RTSP URI (`rtsp://host[:port]/app/stream[/trackID][?q]`) to a stream.
fn stream_key_from_uri(uri: &str) -> Option<StreamKey> {
    let rest = uri.strip_prefix("rtsp://")?;
    let path = rest.split_once('/').map(|(_, p)| p)?;
    let path = path.split(['?', ';']).next().unwrap_or(path);
    let mut segs = path.split('/').filter(|s| !s.is_empty());
    let app = segs.next()?;
    let stream = segs.next()?;
    Some(StreamKey::new(app, stream))
}

// ── Response builders ────────────────────────────────────────────────────────

fn options_response(cseq: u32) -> String {
    format!(
        "RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
         Public: OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN\r\n\r\n"
    )
}

fn build_sdp() -> String {
    // Minimal H.264 elementary description; the player learns SPS/PPS in-band.
    "v=0\r\n\
     o=- 0 0 IN IP4 0.0.0.0\r\n\
     s=arcly-stream\r\n\
     t=0 0\r\n\
     m=video 0 RTP/AVP 96\r\n\
     a=rtpmap:96 H264/90000\r\n\
     a=control:streamid=0\r\n"
        .to_string()
}

fn describe_response(cseq: u32, uri: &str, sdp: &str) -> String {
    format!(
        "RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
         Content-Base: {uri}\r\nContent-Type: application/sdp\r\n\
         Content-Length: {}\r\n\r\n{sdp}",
        sdp.len()
    )
}

fn setup_response(cseq: u32) -> String {
    format!(
        "RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\n\
         Transport: RTP/AVP/TCP;unicast;interleaved=0-1\r\n\
         Session: 12345678\r\n\r\n"
    )
}

fn play_response(cseq: u32) -> String {
    format!("RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\nSession: 12345678\r\nRange: npt=0.000-\r\n\r\n")
}

fn simple_ok(cseq: u32) -> String {
    format!("RTSP/1.0 200 OK\r\nCSeq: {cseq}\r\nSession: 12345678\r\n\r\n")
}

fn not_implemented(cseq: u32) -> String {
    format!("RTSP/1.0 501 Not Implemented\r\nCSeq: {cseq}\r\n\r\n")
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::protocol::rtsp::message::{InterleavedFrame, RtspResponse};

    #[test]
    fn interleaved_frame_round_trips() {
        let rtp = [0x80u8, 0xE0, 0x00, 0x01, 0xAA, 0xBB];
        let mut framed = Vec::new();
        frame_interleaved(0, &rtp, &mut framed);
        assert_eq!(framed[0], b'$');
        let (f, used) = InterleavedFrame::parse(&framed).expect("parse");
        assert_eq!(used, framed.len());
        assert_eq!(f.channel, 0);
        assert_eq!(f.payload, &rtp);
    }

    #[test]
    fn stream_key_parsed_from_uri_variants() {
        let k = stream_key_from_uri("rtsp://host:554/live/cam").unwrap();
        assert_eq!((k.app.as_str(), k.stream_id.as_str()), ("live", "cam"));
        // trailing track/control + query are stripped.
        let k = stream_key_from_uri("rtsp://h/live/cam/streamid=0?x=1").unwrap();
        assert_eq!((k.app.as_str(), k.stream_id.as_str()), ("live", "cam"));
        assert!(stream_key_from_uri("rtsp://host/onlyapp").is_none());
    }

    #[test]
    fn responses_are_well_formed() {
        assert!(options_response(2).contains("Public: OPTIONS"));
        let d = describe_response(3, "rtsp://h/live/cam", &build_sdp());
        let parsed = RtspResponse::parse(
            d.split("\r\n\r\n").next().unwrap(),
            d.split("\r\n\r\n").nth(1).unwrap_or("").to_string(),
        )
        .expect("response parses");
        assert_eq!(parsed.header("Content-Type"), Some("application/sdp"));
        assert!(setup_response(4).contains("interleaved=0-1"));
    }
}