arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Native RTSP ingest handler (feature `rtsp`).
//!
//! Pulls live media from RTSP sources — IP cameras, hardware encoders, and
//! restreamers — and bridges it onto the engine bus through the standard
//! [`InboundProtocol`] seam. The handler acts as an **RTSP client**: for each
//! configured [`RtspSource`] it drives the session state machine
//! (`OPTIONS → DESCRIBE → SETUP → PLAY → TEARDOWN`), then depacketizes the RTP
//! media into Annex-B access units and publishes them.
//!
//! # Transport
//!
//! TCP-interleaved transport (RTP-over-RTSP, RFC 2326 §10.12) is the default and
//! the most camera-compatible: media and control share the one TCP connection,
//! so it traverses NAT and firewalls that block the classic UDP transport. The
//! [interleaved framing][InterleavedFrame] (`$ channel length …`) is parsed
//! here; the RTP payloads feed the shared [`H264Depacketizer`].
//!
//! [`InboundProtocol`]: crate::inbound::InboundProtocol
//! [`H264Depacketizer`]: crate::protocol::rtp::H264Depacketizer
//!
//! # Async behavior & teardown
//!
//! [`serve`](crate::inbound::InboundProtocol::serve) spawns one pull task per
//! source and supervises them until `shutdown` fires, at which point each task
//! issues `TEARDOWN` and releases its [`PublishSession`](crate::inbound::PublishSession). A source that drops is
//! retried with backoff so a flaky camera link self-heals.
//!
//! # Scope
//!
//! The message, SDP, and interleaved-framing parsers are complete and unit
//! tested. Digest authentication and ONVIF discovery are out of scope for the
//! kernel — a host that needs them resolves the authenticated URL and passes it
//! in via [`RtspSource`].

mod egress;
mod message;
mod sdp;

pub use egress::RtspServer;
pub use message::{InterleavedFrame, RtspMethod, RtspRequest, RtspResponse};
pub use sdp::{MediaDescription, Sdp};

use crate::inbound::{InboundProtocol, IngestContext};
use crate::protocol::rtp::{AacDepacketizer, H264Depacketizer, RtpHeader};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

/// One RTSP source to pull and the stream key it publishes to.
#[derive(Debug, Clone)]
pub struct RtspSource {
    /// Absolute `rtsp://` URL (credentials, if any, embedded by the host).
    pub url: String,
    /// Engine stream key the pulled media is published under.
    pub key: StreamKey,
}

impl RtspSource {
    /// A source pulling `url` and publishing it as `key`.
    pub fn new(url: impl Into<String>, key: StreamKey) -> Self {
        Self {
            url: url.into(),
            key,
        }
    }
}

/// RTSP ingest worker — pulls every configured [`RtspSource`] concurrently.
#[derive(Debug)]
pub struct RtspHandler {
    sources: Vec<RtspSource>,
    retry_backoff: Duration,
}

impl Default for RtspHandler {
    fn default() -> Self {
        Self::new()
    }
}

impl RtspHandler {
    /// A handler with no sources. Add them with [`source`](Self::source).
    pub fn new() -> Self {
        Self {
            sources: Vec::new(),
            retry_backoff: Duration::from_secs(3),
        }
    }

    /// Register a source to pull.
    pub fn source(mut self, source: RtspSource) -> Self {
        self.sources.push(source);
        self
    }

    /// Override the reconnect backoff applied after a source drops (default 3s).
    pub fn retry_backoff(mut self, backoff: Duration) -> Self {
        self.retry_backoff = backoff;
        self
    }

    /// Pull one source until `shutdown`, reconnecting on failure. Owned
    /// arguments so each source runs on its own spawned task.
    async fn run_source(
        source: RtspSource,
        ctx: IngestContext,
        shutdown: CancellationToken,
        backoff: Duration,
    ) {
        loop {
            if shutdown.is_cancelled() {
                return;
            }
            // Race the whole pull session against shutdown. `pull_once` performs
            // blocking network I/O (TCP connect, request/response, the RTP read
            // loop); a peer that silently drops packets would otherwise wedge the
            // connect/read for the OS timeout and make graceful shutdown hang.
            // Cancelling drops the future, closing the socket immediately.
            tokio::select! {
                _ = shutdown.cancelled() => return,
                res = Self::pull_once(&source, &ctx, &shutdown) => {
                    if let Err(e) = res {
                        warn!(url = %source.url, error = %e, "rtsp source dropped; will retry");
                    }
                }
            }
            tokio::select! {
                _ = shutdown.cancelled() => return,
                _ = tokio::time::sleep(backoff) => {}
            }
        }
    }

    /// One full pull session for `source`. Connects, negotiates, and streams
    /// interleaved RTP until the link drops or `shutdown` fires.
    async fn pull_once(
        source: &RtspSource,
        ctx: &IngestContext,
        shutdown: &CancellationToken,
    ) -> Result<()> {
        use tokio::io::{AsyncReadExt, AsyncWriteExt};
        use tokio::net::TcpStream;

        let (host, port) = message::host_port(&source.url)
            .ok_or_else(|| crate::StreamError::protocol("malformed rtsp url"))?;
        let mut stream = TcpStream::connect((host.as_str(), port)).await?;
        let mut cseq = 1u32;

        // OPTIONS → DESCRIBE → SETUP → PLAY.
        message::write_request(&mut stream, RtspMethod::Options, &source.url, cseq, &[]).await?;
        let _ = message::read_response(&mut stream).await?;
        cseq += 1;

        message::write_request(
            &mut stream,
            RtspMethod::Describe,
            &source.url,
            cseq,
            &[("Accept", "application/sdp")],
        )
        .await?;
        let describe = message::read_response(&mut stream).await?;
        let sdp = Sdp::parse(&describe.body);
        debug!(url = %source.url, media = sdp.media.len(), "rtsp DESCRIBE parsed");
        cseq += 1;

        // SETUP the first video track over interleaved channels 0/1.
        let setup_url = sdp
            .first_video_control(&source.url)
            .unwrap_or_else(|| source.url.clone());
        message::write_request(
            &mut stream,
            RtspMethod::Setup,
            &setup_url,
            cseq,
            &[("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1")],
        )
        .await?;
        let setup = message::read_response(&mut stream).await?;
        let session_id = message::session_id(&setup).unwrap_or_default();
        cseq += 1;

        // If the session offers an AAC audio track, SETUP it on channels 2/3.
        let mut audio_clock = None;
        if sdp.has_aac_audio() {
            if let Some(audio_url) = sdp.first_audio_control(&source.url) {
                message::write_request(
                    &mut stream,
                    RtspMethod::Setup,
                    &audio_url,
                    cseq,
                    &[
                        ("Transport", "RTP/AVP/TCP;unicast;interleaved=2-3"),
                        ("Session", &session_id),
                    ],
                )
                .await?;
                let _ = message::read_response(&mut stream).await?;
                cseq += 1;
                audio_clock = sdp
                    .media
                    .iter()
                    .find(|m| m.media == "audio")
                    .and_then(|m| m.clock_rate)
                    .or(Some(48_000));
                debug!(url = %source.url, "rtsp AAC audio track set up on ch 2/3");
            }
        }

        message::write_request(
            &mut stream,
            RtspMethod::Play,
            &source.url,
            cseq,
            &[("Session", &session_id)],
        )
        .await?;
        let _ = message::read_response(&mut stream).await?;

        // Stream interleaved RTP → depacketize → publish.
        let session = ctx.open_publish(source.key.clone()).await?;
        let mut depack = H264Depacketizer::new();
        let (size_len, index_len) = sdp.audio_aac_lengths();
        let aac = AacDepacketizer::with_lengths(size_len, index_len);
        let mut buf = Vec::with_capacity(64 * 1024);
        let mut read = [0u8; 16 * 1024];

        loop {
            tokio::select! {
                _ = shutdown.cancelled() => break,
                n = stream.read(&mut read) => {
                    let n = n?;
                    if n == 0 { break; }
                    buf.extend_from_slice(&read[..n]);
                    Self::drain_interleaved(&mut buf, &mut depack, &aac, audio_clock, &session)?;
                }
            }
        }

        // Best-effort TEARDOWN, then release the publish slot.
        cseq += 1;
        let _ = message::write_request(
            &mut stream,
            RtspMethod::Teardown,
            &source.url,
            cseq,
            &[("Session", &session_id)],
        )
        .await;
        let _ = stream.shutdown().await;
        session.finish().await
    }

    /// Consume whole interleaved frames from `buf`, depacketizing channel-0 video
    /// RTP and (when `audio_clock` is set) channel-2 AAC RTP into frames and
    /// publishing them. Leaves any partial frame in `buf`.
    fn drain_interleaved(
        buf: &mut Vec<u8>,
        depack: &mut H264Depacketizer,
        aac: &AacDepacketizer,
        audio_clock: Option<u32>,
        session: &crate::inbound::PublishSession,
    ) -> Result<()> {
        let mut consumed = 0;
        while let Some((frame, len)) = InterleavedFrame::parse(&buf[consumed..]) {
            consumed += len;
            let Some(header) = RtpHeader::parse(frame.payload) else {
                continue;
            };
            let payload = &frame.payload[header.payload_offset..];
            // Channel 0 = video RTP, channel 2 = audio RTP; odd channels are
            // RTCP, ignored on ingest.
            match frame.channel {
                0 => {
                    match depack.push(payload, header.marker, header.timestamp, header.sequence) {
                        Ok(Some(au)) => {
                            let pts = (au.timestamp / 90) as i64; // 90 kHz → ms
                            let mf = MediaFrame::new_video(
                                pts,
                                pts,
                                au.data,
                                CodecId::H264,
                                au.keyframe,
                            );
                            let _ = session.publish_frame(mf)?;
                        }
                        Ok(None) => {}
                        Err(e) => debug!(?e, "rtp depacketize skip"),
                    }
                }
                2 => {
                    if let Some(clock) = audio_clock {
                        match aac.push(payload) {
                            Ok(units) => {
                                for au in units {
                                    let pts =
                                        (header.timestamp as i64 * 1000) / clock.max(1) as i64;
                                    let mf = MediaFrame::new_audio(pts, au, CodecId::AAC);
                                    let _ = session.publish_frame(mf)?;
                                }
                            }
                            Err(e) => debug!(?e, "aac depacketize skip"),
                        }
                    }
                }
                _ => {}
            }
        }
        buf.drain(..consumed);
        Ok(())
    }
}

#[async_trait]
impl InboundProtocol for RtspHandler {
    fn name(&self) -> &'static str {
        "rtsp"
    }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
        // Pull every source concurrently on its own task; await them all so the
        // worker only returns once every source has drained on shutdown.
        let mut tasks = tokio::task::JoinSet::new();
        for source in &self.sources {
            tasks.spawn(Self::run_source(
                source.clone(),
                ctx.clone(),
                shutdown.clone(),
                self.retry_backoff,
            ));
        }
        while tasks.join_next().await.is_some() {}
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn source_builder_sets_url_and_key() {
        let s = RtspSource::new("rtsp://cam/stream", StreamKey::new("live", "cam1"));
        assert_eq!(s.url, "rtsp://cam/stream");
        assert_eq!(s.key.stream_id.as_str(), "cam1");
    }

    #[test]
    fn handler_collects_sources() {
        let h = RtspHandler::new()
            .source(RtspSource::new("rtsp://a/1", StreamKey::new("live", "a")))
            .source(RtspSource::new("rtsp://b/2", StreamKey::new("live", "b")));
        assert_eq!(h.sources.len(), 2);
        assert_eq!(h.name(), "rtsp");
    }
}