arcly-stream 0.1.6

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! RTSP/1.0 message parsing and TCP-interleaved framing (RFC 2326).

use crate::Result;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};

/// An RTSP request method this client issues (RFC 2326 §10).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RtspMethod {
    /// `OPTIONS` — discover supported methods.
    Options,
    /// `DESCRIBE` — fetch the SDP session description.
    Describe,
    /// `SETUP` — negotiate transport for one media track.
    Setup,
    /// `PLAY` — start the media flow.
    Play,
    /// `TEARDOWN` — stop and release the session.
    Teardown,
}

impl RtspMethod {
    /// The on-the-wire method token.
    pub fn as_str(self) -> &'static str {
        match self {
            RtspMethod::Options => "OPTIONS",
            RtspMethod::Describe => "DESCRIBE",
            RtspMethod::Setup => "SETUP",
            RtspMethod::Play => "PLAY",
            RtspMethod::Teardown => "TEARDOWN",
        }
    }
}

/// A parsed RTSP request (server-side view, for completeness/testing).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtspRequest {
    /// The request method token.
    pub method: String,
    /// The request URI.
    pub uri: String,
    /// `CSeq` sequence number echoed in the response.
    pub cseq: u32,
    /// Header name/value pairs (verbatim order).
    pub headers: Vec<(String, String)>,
}

impl RtspRequest {
    /// Parse a complete request (headers terminated by a blank line). Returns
    /// `None` if the request line or `CSeq` is missing/malformed.
    pub fn parse(text: &str) -> Option<RtspRequest> {
        let mut lines = text.split("\r\n");
        let request_line = lines.next()?;
        let mut parts = request_line.split(' ');
        let method = parts.next()?.to_string();
        let uri = parts.next()?.to_string();
        let mut headers = Vec::new();
        let mut cseq = None;
        for line in lines {
            if line.is_empty() {
                break;
            }
            let (name, value) = line.split_once(':')?;
            let (name, value) = (name.trim().to_string(), value.trim().to_string());
            if name.eq_ignore_ascii_case("cseq") {
                cseq = value.parse().ok();
            }
            headers.push((name, value));
        }
        Some(RtspRequest {
            method,
            uri,
            cseq: cseq?,
            headers,
        })
    }
}

/// A parsed RTSP response.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtspResponse {
    /// Numeric status code (200, 401, 461, …).
    pub status: u16,
    /// Header name/value pairs (verbatim order).
    pub headers: Vec<(String, String)>,
    /// Message body (the SDP for a `DESCRIBE` response).
    pub body: String,
}

impl RtspResponse {
    /// Parse `text` (status line + headers) and attach `body`.
    pub fn parse(text: &str, body: String) -> Option<RtspResponse> {
        let mut lines = text.split("\r\n");
        let status_line = lines.next()?;
        // "RTSP/1.0 200 OK"
        let status = status_line.split(' ').nth(1)?.parse().ok()?;
        let mut headers = Vec::new();
        for line in lines {
            if line.is_empty() {
                continue;
            }
            if let Some((name, value)) = line.split_once(':') {
                headers.push((name.trim().to_string(), value.trim().to_string()));
            }
        }
        Some(RtspResponse {
            status,
            headers,
            body,
        })
    }

    /// First header value matching `name`, case-insensitively.
    pub fn header(&self, name: &str) -> Option<&str> {
        self.headers
            .iter()
            .find(|(n, _)| n.eq_ignore_ascii_case(name))
            .map(|(_, v)| v.as_str())
    }
}

/// A TCP-interleaved binary frame: `$`, channel id, 16-bit length, then payload
/// (RFC 2326 §10.12). Used to multiplex RTP/RTCP over the RTSP TCP connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InterleavedFrame<'a> {
    /// Interleaved channel id (even = RTP, odd = RTCP, per the SETUP transport).
    pub channel: u8,
    /// The embedded RTP/RTCP packet.
    pub payload: &'a [u8],
}

impl<'a> InterleavedFrame<'a> {
    /// Parse one interleaved frame from the front of `buf`. Returns the frame and
    /// the total bytes consumed (`4 + length`), or `None` if `buf` does not yet
    /// hold a `$`-prefixed frame in full.
    pub fn parse(buf: &'a [u8]) -> Option<(InterleavedFrame<'a>, usize)> {
        if buf.len() < 4 || buf[0] != b'$' {
            return None;
        }
        let channel = buf[1];
        let length = u16::from_be_bytes([buf[2], buf[3]]) as usize;
        if buf.len() < 4 + length {
            return None;
        }
        Some((
            InterleavedFrame {
                channel,
                payload: &buf[4..4 + length],
            },
            4 + length,
        ))
    }
}

/// Split `rtsp://host[:port]/path` into host and port (default 554).
pub fn host_port(url: &str) -> Option<(String, u16)> {
    let rest = url.strip_prefix("rtsp://")?;
    let authority = rest.split('/').next()?;
    // Strip any embedded userinfo (`user:pass@host`).
    let authority = authority.rsplit('@').next()?;
    match authority.rsplit_once(':') {
        Some((host, port)) => Some((host.to_string(), port.parse().ok()?)),
        None => Some((authority.to_string(), 554)),
    }
}

/// Extract the `Session` id (trimming any `;timeout=` parameter) from a response.
pub fn session_id(resp: &RtspResponse) -> Option<String> {
    let raw = resp.header("Session")?;
    Some(raw.split(';').next().unwrap_or(raw).trim().to_string())
}

/// Write an RTSP request with the standard `CSeq`/`User-Agent` headers plus
/// `extra` header pairs.
pub async fn write_request<W: AsyncWrite + Unpin>(
    w: &mut W,
    method: RtspMethod,
    uri: &str,
    cseq: u32,
    extra: &[(&str, &str)],
) -> Result<()> {
    let mut msg = format!(
        "{} {} RTSP/1.0\r\nCSeq: {}\r\nUser-Agent: arcly-stream\r\n",
        method.as_str(),
        uri,
        cseq
    );
    for (name, value) in extra {
        msg.push_str(name);
        msg.push_str(": ");
        msg.push_str(value);
        msg.push_str("\r\n");
    }
    msg.push_str("\r\n");
    w.write_all(msg.as_bytes()).await?;
    w.flush().await?;
    Ok(())
}

/// Read one RTSP response (headers to the blank line, then any `Content-Length`
/// body). Skips leading interleaved data that may arrive before the response.
pub async fn read_response<R: AsyncRead + Unpin>(r: &mut R) -> Result<RtspResponse> {
    let mut head = Vec::new();
    let mut byte = [0u8; 1];
    // Read until the CRLF CRLF header terminator.
    loop {
        let n = r.read(&mut byte).await?;
        if n == 0 {
            return Err(crate::StreamError::ConnectionClosed);
        }
        head.push(byte[0]);
        if head.ends_with(b"\r\n\r\n") {
            break;
        }
        if head.len() > 64 * 1024 {
            return Err(crate::StreamError::protocol("rtsp header too large"));
        }
    }
    let text = String::from_utf8_lossy(&head).into_owned();
    let content_length = text
        .split("\r\n")
        .find_map(|l| {
            l.split_once(':')
                .filter(|(n, _)| n.eq_ignore_ascii_case("content-length"))
        })
        .and_then(|(_, v)| v.trim().parse::<usize>().ok())
        .unwrap_or(0);

    let mut body = vec![0u8; content_length];
    if content_length > 0 {
        r.read_exact(&mut body).await?;
    }
    RtspResponse::parse(&text, String::from_utf8_lossy(&body).into_owned())
        .ok_or_else(|| crate::StreamError::protocol("malformed rtsp response"))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parses_request_and_cseq() {
        let req = "OPTIONS rtsp://cam/stream RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: x\r\n\r\n";
        let parsed = RtspRequest::parse(req).unwrap();
        assert_eq!(parsed.method, "OPTIONS");
        assert_eq!(parsed.uri, "rtsp://cam/stream");
        assert_eq!(parsed.cseq, 3);
    }

    #[test]
    fn parses_response_status_and_headers() {
        let resp = RtspResponse::parse(
            "RTSP/1.0 200 OK\r\nCSeq: 2\r\nSession: 12345678;timeout=60\r\n",
            String::new(),
        )
        .unwrap();
        assert_eq!(resp.status, 200);
        assert_eq!(resp.header("session"), Some("12345678;timeout=60"));
        assert_eq!(session_id(&resp).as_deref(), Some("12345678"));
    }

    #[test]
    fn host_port_defaults_and_explicit() {
        assert_eq!(host_port("rtsp://cam/stream"), Some(("cam".into(), 554)));
        assert_eq!(host_port("rtsp://cam:8554/s"), Some(("cam".into(), 8554)));
        assert_eq!(
            host_port("rtsp://user:pw@cam:9000/s"),
            Some(("cam".into(), 9000))
        );
        assert!(host_port("http://cam/s").is_none());
    }

    #[test]
    fn interleaved_frame_parses_and_reports_consumed() {
        // $, channel 0, length 3, payload AA BB CC, then a trailing byte.
        let buf = [b'$', 0, 0, 3, 0xAA, 0xBB, 0xCC, 0xFF];
        let (frame, len) = InterleavedFrame::parse(&buf).unwrap();
        assert_eq!(frame.channel, 0);
        assert_eq!(frame.payload, &[0xAA, 0xBB, 0xCC]);
        assert_eq!(len, 7);
    }

    #[test]
    fn interleaved_frame_needs_full_payload() {
        assert!(InterleavedFrame::parse(&[b'$', 0, 0, 9, 1, 2]).is_none());
        assert!(InterleavedFrame::parse(&[0x80, 0, 0, 0]).is_none()); // not a '$' frame
    }
}