arcly-stream 0.1.4

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! WebRTC WHIP/WHEP ingest & egress signaling with a pluggable crypto transport
//! (feature `webrtc`).
//!
//! This module ships the parts of WebRTC that are *protocol logic* — and which
//! can therefore live in a `#![forbid(unsafe_code)]`, dependency-light kernel:
//!
//! - **WHIP/WHEP signaling** ([`WhipEndpoint`]): the HTTP-driven SDP offer/answer
//!   exchange and resource lifecycle. The host wires these calls into *its own*
//!   HTTP server (Axum, Hyper, …) — the kernel never imposes a web framework.
//! - **SDP munging** ([`sdp`]): parse a browser offer, emit a compatible answer.
//! - **RTP routing**: incoming (decrypted) RTP feeds the shared
//!   [`H264Depacketizer`] and lands on the
//!   bus through a [`PublishSession`].
//! - **RTCP feedback** ([`rtcp`]): build PLI/FIR keyframe requests to send back
//!   upstream when a late subscriber needs an IDR.
//!
//! # The crypto seam
//!
//! A *working* WebRTC connection needs DTLS, SRTP, and ICE. Those cannot be
//! hand-rolled correctly without a vetted crypto stack, so they are **not**
//! implemented here. Instead the host supplies them through the
//! [`DtlsSrtpTransport`] trait — backed by a crate such as `str0m` or
//! `webrtc-rs` — and this module drives the media plane over it. The kernel thus
//! stays crypto-free and `unsafe`-free while remaining fully WebRTC-capable when
//! a transport is plugged in.
//!
//! This is an honest boundary: the signaling and media routing are real and
//! tested; the encrypted transport is an injected dependency, by design.

pub mod rtcp;
pub mod sdp;

pub use sdp::{SdpAnswerParams, SdpOffer};

use crate::inbound::{IngestContext, PublishSession};
use crate::protocol::rtp::{H264Depacketizer, RtpHeader};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;

/// The host-supplied DTLS-SRTP transport for one peer connection.
///
/// Implement this over a vetted WebRTC crypto stack. The kernel calls it to pull
/// decrypted RTP and to push RTCP feedback; it never sees keys or handshakes.
#[async_trait]
pub trait DtlsSrtpTransport: Send + Sync {
    /// The DTLS certificate fingerprint (`sha-256 AA:BB:…`) to advertise in the
    /// SDP answer's `a=fingerprint` line.
    fn fingerprint(&self) -> String;

    /// The ICE ufrag/pwd pair to advertise in the SDP answer.
    fn ice_credentials(&self) -> (String, String);

    /// Receive the next decrypted RTP packet, or `None` when the peer closes.
    async fn recv_rtp(&self) -> Option<Vec<u8>>;

    /// Send an RTCP packet (e.g. a PLI/FIR built by [`rtcp`]) back to the peer.
    async fn send_rtcp(&self, packet: &[u8]) -> Result<()>;
}

/// A WHIP/WHEP signaling endpoint the host drives from its HTTP layer.
///
/// `POST` of an SDP offer → [`accept_offer`](Self::accept_offer) returns the SDP
/// answer to write back with `201 Created`. The returned [`WhipResource`] is the
/// handle the host stores (keyed by the `Location` URL) and later
/// [`close`](WhipResource::close)s on `DELETE`.
#[derive(Clone)]
pub struct WhipEndpoint {
    ctx: IngestContext,
}

impl WhipEndpoint {
    /// Build an endpoint that publishes ingested media through `ctx`.
    pub fn new(ctx: IngestContext) -> Self {
        Self { ctx }
    }

    /// Handle a WHIP `POST`: validate the offer, mint the answer from the
    /// transport's credentials, and return the resource handle plus the answer
    /// SDP. The host then runs [`WhipResource::pump`] (typically `tokio::spawn`)
    /// to move media for the connection's lifetime.
    pub fn accept_offer(
        &self,
        offer_sdp: &str,
        key: StreamKey,
        transport: std::sync::Arc<dyn DtlsSrtpTransport>,
    ) -> Result<(WhipResource, String)> {
        let offer = SdpOffer::parse(offer_sdp)
            .ok_or_else(|| crate::StreamError::protocol("malformed SDP offer"))?;
        let (ufrag, pwd) = transport.ice_credentials();
        let answer = sdp::build_answer(
            &offer,
            &SdpAnswerParams {
                fingerprint: transport.fingerprint(),
                ice_ufrag: ufrag,
                ice_pwd: pwd,
            },
        );
        let resource = WhipResource {
            ctx: self.ctx.clone(),
            key,
            transport,
        };
        Ok((resource, answer))
    }
}

/// An accepted WHIP connection: pumps decrypted RTP onto the bus until the peer
/// or transport closes.
pub struct WhipResource {
    ctx: IngestContext,
    key: StreamKey,
    transport: std::sync::Arc<dyn DtlsSrtpTransport>,
}

impl WhipResource {
    /// Drive the media plane: depacketize incoming RTP into H.264 access units
    /// and publish them. Returns when the transport yields `None` (peer gone).
    pub async fn pump(self) -> Result<()> {
        let session: PublishSession = self.ctx.open_publish(self.key.clone()).await?;
        let mut depack = H264Depacketizer::new();
        let mut needs_keyframe = true;

        while let Some(pkt) = self.transport.recv_rtp().await {
            let Some(header) = RtpHeader::parse(&pkt) else {
                continue;
            };
            let payload = &pkt[header.payload_offset..];
            match depack.push(payload, header.marker, header.timestamp, header.sequence) {
                Ok(Some(au)) => {
                    needs_keyframe = false;
                    let pts = (au.timestamp / 90) as i64;
                    let frame =
                        MediaFrame::new_video(pts, pts, au.data, CodecId::H264, au.keyframe);
                    let _ = session.publish_frame(frame)?;
                }
                Ok(None) => {}
                Err(_) => {
                    // Loss/gap: ask the sender for a fresh IDR via RTCP PLI.
                    needs_keyframe = true;
                }
            }
            if needs_keyframe {
                let pli = rtcp::build_pli(0, header.ssrc);
                let _ = self.transport.send_rtcp(&pli).await;
            }
        }

        session.finish().await
    }

    /// Tear the resource down on a WHIP `DELETE` without pumping media.
    pub async fn close(self) -> Result<()> {
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::bus::PlaybackRegistry;
    use std::sync::Arc;
    use tokio::sync::Mutex;

    /// A fake transport that replays a fixed RTP script and records RTCP sent.
    struct FakeTransport {
        packets: Mutex<std::collections::VecDeque<Vec<u8>>>,
        rtcp: Mutex<Vec<Vec<u8>>>,
    }

    #[async_trait]
    impl DtlsSrtpTransport for FakeTransport {
        fn fingerprint(&self) -> String {
            "sha-256 AA:BB".into()
        }
        fn ice_credentials(&self) -> (String, String) {
            ("ufrag".into(), "pwd".into())
        }
        async fn recv_rtp(&self) -> Option<Vec<u8>> {
            self.packets.lock().await.pop_front()
        }
        async fn send_rtcp(&self, packet: &[u8]) -> Result<()> {
            self.rtcp.lock().await.push(packet.to_vec());
            Ok(())
        }
    }

    fn rtp_packet(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
        let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
        p.extend_from_slice(&seq.to_be_bytes());
        p.extend_from_slice(&ts.to_be_bytes());
        p.extend_from_slice(&[0, 0, 0, 7]);
        p.extend_from_slice(payload);
        p
    }

    #[tokio::test]
    async fn accept_offer_builds_answer_with_transport_credentials() {
        let engine = crate::Engine::builder()
            .application(crate::AppSpec::new("live"))
            .build();
        let endpoint = WhipEndpoint::new(IngestContext::new(engine));
        let transport = Arc::new(FakeTransport {
            packets: Mutex::new(Default::default()),
            rtcp: Mutex::new(Vec::new()),
        });
        let offer = "v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\nm=video 9 UDP/TLS/RTP/SAVPF 96\r\na=rtpmap:96 H264/90000\r\n";
        let (_res, answer) = endpoint
            .accept_offer(offer, StreamKey::new("live", "web"), transport)
            .unwrap();
        assert!(answer.contains("a=ice-ufrag:ufrag"));
        assert!(answer.contains("a=fingerprint:sha-256 AA:BB"));
        assert!(answer.contains("a=setup:passive"));
    }

    #[tokio::test]
    async fn pump_publishes_idr_then_releases_slot() {
        let engine = crate::Engine::builder()
            .application(crate::AppSpec::new("live").gop_cache(4))
            .build();
        let key = StreamKey::new("live", "web");
        let ctx = IngestContext::new(engine.clone());

        let mut q = std::collections::VecDeque::new();
        q.push_back(rtp_packet(1, 0, true, &[0x65, 0x11])); // single IDR, marker
        let transport = Arc::new(FakeTransport {
            packets: Mutex::new(q),
            rtcp: Mutex::new(Vec::new()),
        });

        let resource = WhipResource {
            ctx,
            key: key.clone(),
            transport,
        };
        resource.pump().await.unwrap();

        // A complete keyframe was published, so no PLI was needed; the publish
        // slot is released once the transport drained.
        assert!(engine.get_stream(&key).is_err());
    }

    #[tokio::test]
    async fn pump_requests_keyframe_on_a_depacketize_gap() {
        let engine = crate::Engine::builder()
            .application(crate::AppSpec::new("live").gop_cache(4))
            .build();
        let ctx = IngestContext::new(engine);

        // A mid FU-A fragment with no start bit forces an OutOfOrder error → PLI.
        let mut q = std::collections::VecDeque::new();
        q.push_back(rtp_packet(1, 0, false, &[0x7C, 0x05, 0x11])); // FU-A, S=0
        let transport = Arc::new(FakeTransport {
            packets: Mutex::new(q),
            rtcp: Mutex::new(Vec::new()),
        });

        let resource = WhipResource {
            ctx,
            key: StreamKey::new("live", "web2"),
            transport: transport.clone(),
        };
        resource.pump().await.unwrap();
        assert!(
            !transport.rtcp.lock().await.is_empty(),
            "a PLI was sent after the depacketize gap"
        );
    }
}