arcly-stream 0.1.2

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS) — runtime, config, and metrics free.
Documentation
//! Shared ingest utilities for TCP-based protocol handlers.
//!
//! Ported from `sc-protocol-ingest`, freed of the `sc-metrics::Metrics::global()`
//! calls — observability is injected via the engine's [`Observer`], not reached
//! for here.
//!
//! [`Observer`]: crate::Observer

use crate::Result;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::warn;

// ── Frame size constants ────────────────────────────────────────────────────

/// Maximum allowed video frame size (8 MiB). Frames larger than this should be
/// dropped before parsing so a crafted source cannot exhaust heap memory.
pub const MAX_VIDEO_FRAME: usize = 8 * 1024 * 1024;

/// Maximum allowed audio frame size (1 MiB).
pub const MAX_AUDIO_FRAME: usize = 1024 * 1024;

// ── Annex-B NAL start-code scanning ──────────────────────────────────────────

/// A located Annex-B start code within an H.264/H.265 bytestream.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct NalStart {
    /// Byte offset of the start code's leading zero.
    pub offset: usize,
    /// Length of the start code in bytes: 3 (`00 00 01`) or 4 (`00 00 00 01`).
    pub len: usize,
}

/// Find the next Annex-B NAL start code (`00 00 01` or `00 00 00 01`) in `buf`
/// at or after `from`.
///
/// Backed by the crate-canonical start-code scanner, which
/// uses `memchr` to skip directly between candidate `0x01` bytes rather than
/// scanning byte-by-byte — the difference is significant on the ingest hot path
/// where every incoming H.264 packet is split into NAL units. A code that
/// straddles `from` is reported (its leading zeros may precede the origin), so a
/// packetizer can resume mid-buffer without missing a boundary.
///
/// ```
/// use arcly_stream::protocol::find_nal_start;
///
/// // 4-byte start code at offset 0, 3-byte start code at offset 7.
/// let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 0, 1, 0x65];
/// let first = find_nal_start(&buf, 0).unwrap();
/// assert_eq!((first.offset, first.len), (0, 4));
/// let second = find_nal_start(&buf, first.offset + first.len).unwrap();
/// assert_eq!((second.offset, second.len), (6, 4));
/// ```
pub fn find_nal_start(buf: &[u8], from: usize) -> Option<NalStart> {
    crate::bytescan::scan_start_code(buf, from, 0).map(|(offset, len)| NalStart { offset, len })
}

// ── KeyframeGate ─────────────────────────────────────────────────────────────

/// Gate that suppresses delta frames until the first IDR (keyframe) arrives, so
/// late-joining or re-subscribing clients always start at a clean decoder
/// boundary.
///
/// ```
/// use arcly_stream::protocol::KeyframeGate;
/// use arcly_stream::FrameType;
///
/// let mut gate = KeyframeGate::new();
/// // A delta frame before any keyframe is held back:
/// assert!(!gate.admit(FrameType::Delta));
/// // The first keyframe opens the gate and is itself admitted:
/// assert!(gate.admit(FrameType::Key));
/// // Subsequent deltas now flow:
/// assert!(gate.admit(FrameType::Delta));
/// // Audio always flows (it carries no decode dependency on video IDRs):
/// assert!(KeyframeGate::new().admit(FrameType::Audio));
/// ```
#[derive(Debug, Default)]
pub struct KeyframeGate {
    open: bool,
}

impl KeyframeGate {
    /// Create a new gate (initially closed — non-audio frames are held).
    pub fn new() -> Self {
        Self { open: false }
    }

    /// Open the gate unconditionally.
    pub fn open(&mut self) {
        self.open = true;
    }

    /// Whether the gate is currently open.
    pub fn is_open(&self) -> bool {
        self.open
    }

    /// Decide whether a frame of the given type should be admitted, opening the
    /// gate on the first keyframe. Audio is always admitted.
    pub fn admit(&mut self, frame_type: crate::FrameType) -> bool {
        match frame_type {
            crate::FrameType::Audio => true,
            crate::FrameType::Key => {
                self.open = true;
                true
            }
            crate::FrameType::Delta => self.open,
        }
    }
}

// ── IngestRateLimit ──────────────────────────────────────────────────────────

/// Per-connection sliding-window ingress rate limiter (bytes/second).
#[derive(Debug)]
pub struct IngestRateLimit {
    max_bytes_per_sec: u64,
    window_start: Instant,
    bytes_in_window: u64,
}

impl IngestRateLimit {
    /// A new limiter allowing up to `max_bytes_per_sec` bytes per rolling second.
    pub fn new(max_bytes_per_sec: u64) -> Self {
        Self {
            max_bytes_per_sec,
            window_start: Instant::now(),
            bytes_in_window: 0,
        }
    }

    /// Record `len` ingested bytes; returns `false` if the per-second budget is
    /// exceeded (the caller should drop the connection or backpressure).
    pub fn allow(&mut self, len: usize) -> bool {
        if self.max_bytes_per_sec == 0 {
            return true; // unlimited
        }
        let now = Instant::now();
        if now.duration_since(self.window_start).as_secs() >= 1 {
            self.window_start = now;
            self.bytes_in_window = 0;
        }
        self.bytes_in_window = self.bytes_in_window.saturating_add(len as u64);
        self.bytes_in_window <= self.max_bytes_per_sec
    }
}

// ── Generic TCP accept loop ──────────────────────────────────────────────────

/// Generic TCP accept loop shared by RTMP/RTSP handlers.
///
/// Binds `addr`, then for each accepted connection spawns `handle` (bounded by
/// `max_connections` via a semaphore). Runs until `shutdown` is cancelled.
///
/// `handle` receives the socket and peer address; it owns parsing the protocol
/// and forwarding frames to a [`PublishRegistry`](crate::PublishRegistry).
pub async fn run_tcp_ingest_server<F, Fut>(
    addr: SocketAddr,
    max_connections: usize,
    shutdown: CancellationToken,
    handle: F,
) -> Result<()>
where
    F: Fn(tokio::net::TcpStream, SocketAddr) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = ()> + Send + 'static,
{
    let listener = TcpListener::bind(addr).await?;
    let limiter = Arc::new(Semaphore::new(max_connections.max(1)));
    let handle = Arc::new(handle);

    loop {
        tokio::select! {
            _ = shutdown.cancelled() => return Ok(()),
            accepted = listener.accept() => {
                let (sock, peer) = match accepted {
                    Ok(pair) => pair,
                    Err(e) => { warn!(error = %e, "accept failed"); continue; }
                };
                let permit = match Arc::clone(&limiter).try_acquire_owned() {
                    Ok(p) => p,
                    Err(_) => {
                        warn!(%peer, "connection limit reached; rejecting");
                        continue;
                    }
                };
                let handle = Arc::clone(&handle);
                tokio::spawn(async move {
                    let _permit = permit; // released on task completion
                    handle(sock, peer).await;
                });
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::FrameType;

    #[test]
    fn nal_start_finds_three_and_four_byte_codes() {
        let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 1, 0x65];
        let first = find_nal_start(&buf, 0).unwrap();
        assert_eq!(first, NalStart { offset: 0, len: 4 });
        // Resume scanning past the first; next is the 3-byte code at offset 6.
        let second = find_nal_start(&buf, first.offset + first.len).unwrap();
        assert_eq!(second, NalStart { offset: 6, len: 3 });
    }

    #[test]
    fn nal_start_returns_none_without_a_code() {
        assert!(find_nal_start(&[0x01, 0x02, 0x00, 0x01], 0).is_none());
        assert!(find_nal_start(&[], 0).is_none());
    }

    #[test]
    fn rate_limit_resets_each_window() {
        let mut rl = IngestRateLimit::new(100);
        assert!(rl.allow(60));
        assert!(rl.allow(40)); // exactly at budget
        assert!(!rl.allow(1)); // over budget within the same second
                               // Force the window boundary and confirm the budget refreshes.
        rl.window_start = Instant::now() - std::time::Duration::from_secs(2);
        assert!(rl.allow(100));
    }

    #[test]
    fn rate_limit_zero_is_unlimited() {
        let mut rl = IngestRateLimit::new(0);
        assert!(rl.allow(usize::MAX));
    }

    #[test]
    fn keyframe_gate_holds_deltas_until_idr() {
        let mut gate = KeyframeGate::new();
        assert!(!gate.is_open());
        assert!(!gate.admit(FrameType::Delta));
        assert!(gate.admit(FrameType::Audio)); // audio bypasses the gate
        assert!(gate.admit(FrameType::Key)); // opens
        assert!(gate.is_open());
        assert!(gate.admit(FrameType::Delta));
    }
}