Skip to main content

arcly_stream/protocol/
ingest.rs

1//! Shared ingest utilities for TCP-based protocol handlers.
2//!
3//! Ported from `sc-protocol-ingest`, freed of the `sc-metrics::Metrics::global()`
4//! calls — observability is injected via the engine's [`Observer`], not reached
5//! for here.
6//!
7//! [`Observer`]: crate::Observer
8
9use crate::Result;
10use std::future::Future;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::net::TcpListener;
15use tokio::sync::Semaphore;
16use tokio_util::sync::CancellationToken;
17use tracing::warn;
18
19// ── Frame size constants ────────────────────────────────────────────────────
20
21/// Maximum allowed video frame size (8 MiB). Frames larger than this should be
22/// dropped before parsing so a crafted source cannot exhaust heap memory.
23pub const MAX_VIDEO_FRAME: usize = 8 * 1024 * 1024;
24
25/// Maximum allowed audio frame size (1 MiB).
26pub const MAX_AUDIO_FRAME: usize = 1024 * 1024;
27
28// ── Annex-B NAL start-code scanning ──────────────────────────────────────────
29
30/// A located Annex-B start code within an H.264/H.265 bytestream.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct NalStart {
33    /// Byte offset of the start code's leading zero.
34    pub offset: usize,
35    /// Length of the start code in bytes: 3 (`00 00 01`) or 4 (`00 00 00 01`).
36    pub len: usize,
37}
38
39/// Find the next Annex-B NAL start code (`00 00 01` or `00 00 00 01`) in `buf`
40/// at or after `from`.
41///
42/// Backed by the crate-canonical start-code scanner, which
43/// uses `memchr` to skip directly between candidate `0x01` bytes rather than
44/// scanning byte-by-byte — the difference is significant on the ingest hot path
45/// where every incoming H.264 packet is split into NAL units. A code that
46/// straddles `from` is reported (its leading zeros may precede the origin), so a
47/// packetizer can resume mid-buffer without missing a boundary.
48///
49/// ```
50/// use arcly_stream::protocol::find_nal_start;
51///
52/// // 4-byte start code at offset 0, 3-byte start code at offset 7.
53/// let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 0, 1, 0x65];
54/// let first = find_nal_start(&buf, 0).unwrap();
55/// assert_eq!((first.offset, first.len), (0, 4));
56/// let second = find_nal_start(&buf, first.offset + first.len).unwrap();
57/// assert_eq!((second.offset, second.len), (6, 4));
58/// ```
59pub fn find_nal_start(buf: &[u8], from: usize) -> Option<NalStart> {
60    crate::bytescan::scan_start_code(buf, from, 0).map(|(offset, len)| NalStart { offset, len })
61}
62
63// ── KeyframeGate ─────────────────────────────────────────────────────────────
64
65/// Gate that suppresses delta frames until the first IDR (keyframe) arrives, so
66/// late-joining or re-subscribing clients always start at a clean decoder
67/// boundary.
68///
69/// ```
70/// use arcly_stream::protocol::KeyframeGate;
71/// use arcly_stream::FrameType;
72///
73/// let mut gate = KeyframeGate::new();
74/// // A delta frame before any keyframe is held back:
75/// assert!(!gate.admit(FrameType::Delta));
76/// // The first keyframe opens the gate and is itself admitted:
77/// assert!(gate.admit(FrameType::Key));
78/// // Subsequent deltas now flow:
79/// assert!(gate.admit(FrameType::Delta));
80/// // Audio always flows (it carries no decode dependency on video IDRs):
81/// assert!(KeyframeGate::new().admit(FrameType::Audio));
82/// ```
83#[derive(Debug, Default)]
84pub struct KeyframeGate {
85    open: bool,
86}
87
88impl KeyframeGate {
89    /// Create a new gate (initially closed — non-audio frames are held).
90    pub fn new() -> Self {
91        Self { open: false }
92    }
93
94    /// Open the gate unconditionally.
95    pub fn open(&mut self) {
96        self.open = true;
97    }
98
99    /// Whether the gate is currently open.
100    pub fn is_open(&self) -> bool {
101        self.open
102    }
103
104    /// Decide whether a frame of the given type should be admitted, opening the
105    /// gate on the first keyframe. Audio is always admitted.
106    pub fn admit(&mut self, frame_type: crate::FrameType) -> bool {
107        match frame_type {
108            crate::FrameType::Audio => true,
109            crate::FrameType::Key => {
110                self.open = true;
111                true
112            }
113            crate::FrameType::Delta => self.open,
114        }
115    }
116}
117
118// ── IngestRateLimit ──────────────────────────────────────────────────────────
119
120/// Per-connection sliding-window ingress rate limiter (bytes/second).
121#[derive(Debug)]
122pub struct IngestRateLimit {
123    max_bytes_per_sec: u64,
124    window_start: Instant,
125    bytes_in_window: u64,
126}
127
128impl IngestRateLimit {
129    /// A new limiter allowing up to `max_bytes_per_sec` bytes per rolling second.
130    pub fn new(max_bytes_per_sec: u64) -> Self {
131        Self {
132            max_bytes_per_sec,
133            window_start: Instant::now(),
134            bytes_in_window: 0,
135        }
136    }
137
138    /// Record `len` ingested bytes; returns `false` if the per-second budget is
139    /// exceeded (the caller should drop the connection or backpressure).
140    pub fn allow(&mut self, len: usize) -> bool {
141        if self.max_bytes_per_sec == 0 {
142            return true; // unlimited
143        }
144        let now = Instant::now();
145        if now.duration_since(self.window_start).as_secs() >= 1 {
146            self.window_start = now;
147            self.bytes_in_window = 0;
148        }
149        self.bytes_in_window = self.bytes_in_window.saturating_add(len as u64);
150        self.bytes_in_window <= self.max_bytes_per_sec
151    }
152}
153
154// ── Generic TCP accept loop ──────────────────────────────────────────────────
155
156/// Generic TCP accept loop shared by RTMP/RTSP handlers.
157///
158/// Binds `addr`, then for each accepted connection spawns `handle` (bounded by
159/// `max_connections` via a semaphore). Runs until `shutdown` is cancelled.
160///
161/// `handle` receives the socket and peer address; it owns parsing the protocol
162/// and forwarding frames to a [`PublishRegistry`](crate::PublishRegistry).
163pub async fn run_tcp_ingest_server<F, Fut>(
164    addr: SocketAddr,
165    max_connections: usize,
166    shutdown: CancellationToken,
167    handle: F,
168) -> Result<()>
169where
170    F: Fn(tokio::net::TcpStream, SocketAddr) -> Fut + Send + Sync + 'static,
171    Fut: Future<Output = ()> + Send + 'static,
172{
173    let listener = TcpListener::bind(addr).await?;
174    let limiter = Arc::new(Semaphore::new(max_connections.max(1)));
175    let handle = Arc::new(handle);
176
177    loop {
178        tokio::select! {
179            _ = shutdown.cancelled() => return Ok(()),
180            accepted = listener.accept() => {
181                let (sock, peer) = match accepted {
182                    Ok(pair) => pair,
183                    Err(e) => { warn!(error = %e, "accept failed"); continue; }
184                };
185                let permit = match Arc::clone(&limiter).try_acquire_owned() {
186                    Ok(p) => p,
187                    Err(_) => {
188                        warn!(%peer, "connection limit reached; rejecting");
189                        continue;
190                    }
191                };
192                let handle = Arc::clone(&handle);
193                tokio::spawn(async move {
194                    let _permit = permit; // released on task completion
195                    handle(sock, peer).await;
196                });
197            }
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::FrameType;
206
207    #[test]
208    fn nal_start_finds_three_and_four_byte_codes() {
209        let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 1, 0x65];
210        let first = find_nal_start(&buf, 0).unwrap();
211        assert_eq!(first, NalStart { offset: 0, len: 4 });
212        // Resume scanning past the first; next is the 3-byte code at offset 6.
213        let second = find_nal_start(&buf, first.offset + first.len).unwrap();
214        assert_eq!(second, NalStart { offset: 6, len: 3 });
215    }
216
217    #[test]
218    fn nal_start_returns_none_without_a_code() {
219        assert!(find_nal_start(&[0x01, 0x02, 0x00, 0x01], 0).is_none());
220        assert!(find_nal_start(&[], 0).is_none());
221    }
222
223    #[test]
224    fn rate_limit_resets_each_window() {
225        let mut rl = IngestRateLimit::new(100);
226        assert!(rl.allow(60));
227        assert!(rl.allow(40)); // exactly at budget
228        assert!(!rl.allow(1)); // over budget within the same second
229                               // Force the window boundary and confirm the budget refreshes.
230        rl.window_start = Instant::now() - std::time::Duration::from_secs(2);
231        assert!(rl.allow(100));
232    }
233
234    #[test]
235    fn rate_limit_zero_is_unlimited() {
236        let mut rl = IngestRateLimit::new(0);
237        assert!(rl.allow(usize::MAX));
238    }
239
240    #[test]
241    fn keyframe_gate_holds_deltas_until_idr() {
242        let mut gate = KeyframeGate::new();
243        assert!(!gate.is_open());
244        assert!(!gate.admit(FrameType::Delta));
245        assert!(gate.admit(FrameType::Audio)); // audio bypasses the gate
246        assert!(gate.admit(FrameType::Key)); // opens
247        assert!(gate.is_open());
248        assert!(gate.admit(FrameType::Delta));
249    }
250}