arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Native RTSP ingest handler (feature `rtsp`).
//!
//! Pulls live media from RTSP sources — IP cameras, hardware encoders, and
//! restreamers — and bridges it onto the engine bus through the standard
//! [`InboundProtocol`] seam. The handler acts as an **RTSP client**: for each
//! configured [`RtspSource`] it drives the session state machine
//! (`OPTIONS → DESCRIBE → SETUP → PLAY → TEARDOWN`), then depacketizes the RTP
//! media into Annex-B access units and publishes them.
//!
//! # Transport
//!
//! TCP-interleaved transport (RTP-over-RTSP, RFC 2326 §10.12) is the default and
//! the most camera-compatible: media and control share the one TCP connection,
//! so it traverses NAT and firewalls that block the classic UDP transport. The
//! [interleaved framing][InterleavedFrame] (`$ channel length …`) is parsed
//! here; the RTP payloads feed the shared [`H264Depacketizer`].
//!
//! [`InboundProtocol`]: crate::inbound::InboundProtocol
//! [`H264Depacketizer`]: crate::protocol::rtp::H264Depacketizer
//!
//! # Async behavior & teardown
//!
//! [`serve`](crate::inbound::InboundProtocol::serve) spawns one pull task per
//! source and supervises them until `shutdown` fires, at which point each task
//! issues `TEARDOWN` and releases its [`PublishSession`](crate::inbound::PublishSession). A source that drops is
//! retried with backoff so a flaky camera link self-heals.
//!
//! # Scope
//!
//! The message, SDP, and interleaved-framing parsers are complete and unit
//! tested. Digest authentication and ONVIF discovery are out of scope for the
//! kernel — a host that needs them resolves the authenticated URL and passes it
//! in via [`RtspSource`].

mod message;
mod sdp;

pub use message::{InterleavedFrame, RtspMethod, RtspRequest, RtspResponse};
pub use sdp::{MediaDescription, Sdp};

use crate::inbound::{InboundProtocol, IngestContext};
use crate::protocol::rtp::{H264Depacketizer, RtpHeader};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};

/// One RTSP source to pull and the stream key it publishes to.
#[derive(Debug, Clone)]
pub struct RtspSource {
    /// Absolute `rtsp://` URL (credentials, if any, embedded by the host).
    pub url: String,
    /// Engine stream key the pulled media is published under.
    pub key: StreamKey,
}

impl RtspSource {
    /// A source pulling `url` and publishing it as `key`.
    pub fn new(url: impl Into<String>, key: StreamKey) -> Self {
        Self {
            url: url.into(),
            key,
        }
    }
}

/// RTSP ingest worker — pulls every configured [`RtspSource`] concurrently.
#[derive(Debug)]
pub struct RtspHandler {
    sources: Vec<RtspSource>,
    retry_backoff: Duration,
}

impl Default for RtspHandler {
    fn default() -> Self {
        Self::new()
    }
}

impl RtspHandler {
    /// A handler with no sources. Add them with [`source`](Self::source).
    pub fn new() -> Self {
        Self {
            sources: Vec::new(),
            retry_backoff: Duration::from_secs(3),
        }
    }

    /// Register a source to pull.
    pub fn source(mut self, source: RtspSource) -> Self {
        self.sources.push(source);
        self
    }

    /// Override the reconnect backoff applied after a source drops (default 3s).
    pub fn retry_backoff(mut self, backoff: Duration) -> Self {
        self.retry_backoff = backoff;
        self
    }

    /// Pull one source until `shutdown`, reconnecting on failure. Owned
    /// arguments so each source runs on its own spawned task.
    async fn run_source(
        source: RtspSource,
        ctx: IngestContext,
        shutdown: CancellationToken,
        backoff: Duration,
    ) {
        loop {
            if shutdown.is_cancelled() {
                return;
            }
            if let Err(e) = Self::pull_once(&source, &ctx, &shutdown).await {
                warn!(url = %source.url, error = %e, "rtsp source dropped; will retry");
            }
            tokio::select! {
                _ = shutdown.cancelled() => return,
                _ = tokio::time::sleep(backoff) => {}
            }
        }
    }

    /// One full pull session for `source`. Connects, negotiates, and streams
    /// interleaved RTP until the link drops or `shutdown` fires.
    async fn pull_once(
        source: &RtspSource,
        ctx: &IngestContext,
        shutdown: &CancellationToken,
    ) -> Result<()> {
        use tokio::io::{AsyncReadExt, AsyncWriteExt};
        use tokio::net::TcpStream;

        let (host, port) = message::host_port(&source.url)
            .ok_or_else(|| crate::StreamError::protocol("malformed rtsp url"))?;
        let mut stream = TcpStream::connect((host.as_str(), port)).await?;
        let mut cseq = 1u32;

        // OPTIONS → DESCRIBE → SETUP → PLAY.
        message::write_request(&mut stream, RtspMethod::Options, &source.url, cseq, &[]).await?;
        let _ = message::read_response(&mut stream).await?;
        cseq += 1;

        message::write_request(
            &mut stream,
            RtspMethod::Describe,
            &source.url,
            cseq,
            &[("Accept", "application/sdp")],
        )
        .await?;
        let describe = message::read_response(&mut stream).await?;
        let sdp = Sdp::parse(&describe.body);
        debug!(url = %source.url, media = sdp.media.len(), "rtsp DESCRIBE parsed");
        cseq += 1;

        // SETUP the first video track over interleaved channels 0/1.
        let setup_url = sdp
            .first_video_control(&source.url)
            .unwrap_or_else(|| source.url.clone());
        message::write_request(
            &mut stream,
            RtspMethod::Setup,
            &setup_url,
            cseq,
            &[("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1")],
        )
        .await?;
        let setup = message::read_response(&mut stream).await?;
        let session_id = message::session_id(&setup).unwrap_or_default();
        cseq += 1;

        message::write_request(
            &mut stream,
            RtspMethod::Play,
            &source.url,
            cseq,
            &[("Session", &session_id)],
        )
        .await?;
        let _ = message::read_response(&mut stream).await?;

        // Stream interleaved RTP → depacketize → publish.
        let session = ctx.open_publish(source.key.clone()).await?;
        let mut depack = H264Depacketizer::new();
        let mut buf = Vec::with_capacity(64 * 1024);
        let mut read = [0u8; 16 * 1024];

        loop {
            tokio::select! {
                _ = shutdown.cancelled() => break,
                n = stream.read(&mut read) => {
                    let n = n?;
                    if n == 0 { break; }
                    buf.extend_from_slice(&read[..n]);
                    Self::drain_interleaved(&mut buf, &mut depack, &session)?;
                }
            }
        }

        // Best-effort TEARDOWN, then release the publish slot.
        cseq += 1;
        let _ = message::write_request(
            &mut stream,
            RtspMethod::Teardown,
            &source.url,
            cseq,
            &[("Session", &session_id)],
        )
        .await;
        let _ = stream.shutdown().await;
        session.finish().await
    }

    /// Consume whole interleaved frames from `buf`, depacketizing channel-0 RTP
    /// into access units and publishing them. Leaves any partial frame in `buf`.
    fn drain_interleaved(
        buf: &mut Vec<u8>,
        depack: &mut H264Depacketizer,
        session: &crate::inbound::PublishSession,
    ) -> Result<()> {
        let mut consumed = 0;
        while let Some((frame, len)) = InterleavedFrame::parse(&buf[consumed..]) {
            consumed += len;
            // Channel 0 = RTP media; channel 1 = RTCP (ignored on ingest).
            if frame.channel == 0 {
                if let Some(header) = RtpHeader::parse(frame.payload) {
                    let payload = &frame.payload[header.payload_offset..];
                    match depack.push(payload, header.marker, header.timestamp, header.sequence) {
                        Ok(Some(au)) => {
                            let pts = (au.timestamp / 90) as i64; // 90 kHz → ms
                            let mf = MediaFrame::new_video(
                                pts,
                                pts,
                                au.data,
                                CodecId::H264,
                                au.keyframe,
                            );
                            let _ = session.publish_frame(mf)?;
                        }
                        Ok(None) => {}
                        Err(e) => debug!(?e, "rtp depacketize skip"),
                    }
                }
            }
        }
        buf.drain(..consumed);
        Ok(())
    }
}

#[async_trait]
impl InboundProtocol for RtspHandler {
    fn name(&self) -> &'static str {
        "rtsp"
    }

    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
        // Pull every source concurrently on its own task; await them all so the
        // worker only returns once every source has drained on shutdown.
        let mut tasks = tokio::task::JoinSet::new();
        for source in &self.sources {
            tasks.spawn(Self::run_source(
                source.clone(),
                ctx.clone(),
                shutdown.clone(),
                self.retry_backoff,
            ));
        }
        while tasks.join_next().await.is_some() {}
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn source_builder_sets_url_and_key() {
        let s = RtspSource::new("rtsp://cam/stream", StreamKey::new("live", "cam1"));
        assert_eq!(s.url, "rtsp://cam/stream");
        assert_eq!(s.key.stream_id.as_str(), "cam1");
    }

    #[test]
    fn handler_collects_sources() {
        let h = RtspHandler::new()
            .source(RtspSource::new("rtsp://a/1", StreamKey::new("live", "a")))
            .source(RtspSource::new("rtsp://b/2", StreamKey::new("live", "b")));
        assert_eq!(h.sources.len(), 2);
        assert_eq!(h.name(), "rtsp");
    }
}