arcly_stream/protocol/ingest.rs
1//! Shared ingest utilities for TCP-based protocol handlers.
2//!
3//! Ported from `sc-protocol-ingest`, freed of the `sc-metrics::Metrics::global()`
4//! calls — observability is injected via the engine's [`Observer`], not reached
5//! for here.
6//!
7//! [`Observer`]: crate::Observer
8
9use crate::Result;
10use std::future::Future;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Instant;
14use tokio::net::TcpListener;
15use tokio::sync::Semaphore;
16use tokio_util::sync::CancellationToken;
17use tracing::warn;
18
19// ── Frame size constants ────────────────────────────────────────────────────
20
21/// Maximum allowed video frame size (8 MiB). Frames larger than this should be
22/// dropped before parsing so a crafted source cannot exhaust heap memory.
23pub const MAX_VIDEO_FRAME: usize = 8 * 1024 * 1024;
24
25/// Maximum allowed audio frame size (1 MiB).
26pub const MAX_AUDIO_FRAME: usize = 1024 * 1024;
27
28// ── Annex-B NAL start-code scanning ──────────────────────────────────────────
29
30/// A located Annex-B start code within an H.264/H.265 bytestream.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct NalStart {
33 /// Byte offset of the start code's leading zero.
34 pub offset: usize,
35 /// Length of the start code in bytes: 3 (`00 00 01`) or 4 (`00 00 00 01`).
36 pub len: usize,
37}
38
39/// Find the next Annex-B NAL start code (`00 00 01` or `00 00 00 01`) in `buf`
40/// at or after `from`.
41///
42/// Backed by the crate-canonical start-code scanner, which
43/// uses `memchr` to skip directly between candidate `0x01` bytes rather than
44/// scanning byte-by-byte — the difference is significant on the ingest hot path
45/// where every incoming H.264 packet is split into NAL units. A code that
46/// straddles `from` is reported (its leading zeros may precede the origin), so a
47/// packetizer can resume mid-buffer without missing a boundary.
48///
49/// ```
50/// use arcly_stream::protocol::find_nal_start;
51///
52/// // 4-byte start code at offset 0, 3-byte start code at offset 7.
53/// let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 0, 1, 0x65];
54/// let first = find_nal_start(&buf, 0).unwrap();
55/// assert_eq!((first.offset, first.len), (0, 4));
56/// let second = find_nal_start(&buf, first.offset + first.len).unwrap();
57/// assert_eq!((second.offset, second.len), (6, 4));
58/// ```
59pub fn find_nal_start(buf: &[u8], from: usize) -> Option<NalStart> {
60 crate::bytescan::scan_start_code(buf, from, 0).map(|(offset, len)| NalStart { offset, len })
61}
62
63// ── KeyframeGate ─────────────────────────────────────────────────────────────
64
65/// Gate that suppresses delta frames until the first IDR (keyframe) arrives, so
66/// late-joining or re-subscribing clients always start at a clean decoder
67/// boundary.
68///
69/// ```
70/// use arcly_stream::protocol::KeyframeGate;
71/// use arcly_stream::FrameType;
72///
73/// let mut gate = KeyframeGate::new();
74/// // A delta frame before any keyframe is held back:
75/// assert!(!gate.admit(FrameType::Delta));
76/// // The first keyframe opens the gate and is itself admitted:
77/// assert!(gate.admit(FrameType::Key));
78/// // Subsequent deltas now flow:
79/// assert!(gate.admit(FrameType::Delta));
80/// // Audio always flows (it carries no decode dependency on video IDRs):
81/// assert!(KeyframeGate::new().admit(FrameType::Audio));
82/// ```
83#[derive(Debug, Default)]
84pub struct KeyframeGate {
85 open: bool,
86}
87
88impl KeyframeGate {
89 /// Create a new gate (initially closed — non-audio frames are held).
90 pub fn new() -> Self {
91 Self { open: false }
92 }
93
94 /// Open the gate unconditionally.
95 pub fn open(&mut self) {
96 self.open = true;
97 }
98
99 /// Whether the gate is currently open.
100 pub fn is_open(&self) -> bool {
101 self.open
102 }
103
104 /// Decide whether a frame of the given type should be admitted, opening the
105 /// gate on the first keyframe. Audio is always admitted.
106 pub fn admit(&mut self, frame_type: crate::FrameType) -> bool {
107 match frame_type {
108 crate::FrameType::Audio => true,
109 crate::FrameType::Key => {
110 self.open = true;
111 true
112 }
113 crate::FrameType::Delta => self.open,
114 }
115 }
116}
117
118// ── IngestRateLimit ──────────────────────────────────────────────────────────
119
120/// Per-connection sliding-window ingress rate limiter (bytes/second).
121#[derive(Debug)]
122pub struct IngestRateLimit {
123 max_bytes_per_sec: u64,
124 window_start: Instant,
125 bytes_in_window: u64,
126}
127
128impl IngestRateLimit {
129 /// A new limiter allowing up to `max_bytes_per_sec` bytes per rolling second.
130 pub fn new(max_bytes_per_sec: u64) -> Self {
131 Self {
132 max_bytes_per_sec,
133 window_start: Instant::now(),
134 bytes_in_window: 0,
135 }
136 }
137
138 /// Record `len` ingested bytes; returns `false` if the per-second budget is
139 /// exceeded (the caller should drop the connection or backpressure).
140 pub fn allow(&mut self, len: usize) -> bool {
141 if self.max_bytes_per_sec == 0 {
142 return true; // unlimited
143 }
144 let now = Instant::now();
145 if now.duration_since(self.window_start).as_secs() >= 1 {
146 self.window_start = now;
147 self.bytes_in_window = 0;
148 }
149 self.bytes_in_window = self.bytes_in_window.saturating_add(len as u64);
150 self.bytes_in_window <= self.max_bytes_per_sec
151 }
152}
153
154// ── Generic TCP accept loop ──────────────────────────────────────────────────
155
156/// Generic TCP accept loop shared by RTMP/RTSP handlers.
157///
158/// Binds `addr`, then for each accepted connection spawns `handle` (bounded by
159/// `max_connections` via a semaphore). Runs until `shutdown` is cancelled.
160///
161/// `handle` receives the socket and peer address; it owns parsing the protocol
162/// and forwarding frames to a [`PublishRegistry`](crate::PublishRegistry).
163pub async fn run_tcp_ingest_server<F, Fut>(
164 addr: SocketAddr,
165 max_connections: usize,
166 shutdown: CancellationToken,
167 handle: F,
168) -> Result<()>
169where
170 F: Fn(tokio::net::TcpStream, SocketAddr) -> Fut + Send + Sync + 'static,
171 Fut: Future<Output = ()> + Send + 'static,
172{
173 let listener = TcpListener::bind(addr).await?;
174 let limiter = Arc::new(Semaphore::new(max_connections.max(1)));
175 let handle = Arc::new(handle);
176
177 loop {
178 tokio::select! {
179 _ = shutdown.cancelled() => return Ok(()),
180 accepted = listener.accept() => {
181 let (sock, peer) = match accepted {
182 Ok(pair) => pair,
183 Err(e) => { warn!(error = %e, "accept failed"); continue; }
184 };
185 let permit = match Arc::clone(&limiter).try_acquire_owned() {
186 Ok(p) => p,
187 Err(_) => {
188 warn!(%peer, "connection limit reached; rejecting");
189 continue;
190 }
191 };
192 let handle = Arc::clone(&handle);
193 tokio::spawn(async move {
194 let _permit = permit; // released on task completion
195 handle(sock, peer).await;
196 });
197 }
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::FrameType;
206
207 #[test]
208 fn nal_start_finds_three_and_four_byte_codes() {
209 let buf = [0, 0, 0, 1, 9, 0xF0, 0, 0, 1, 0x65];
210 let first = find_nal_start(&buf, 0).unwrap();
211 assert_eq!(first, NalStart { offset: 0, len: 4 });
212 // Resume scanning past the first; next is the 3-byte code at offset 6.
213 let second = find_nal_start(&buf, first.offset + first.len).unwrap();
214 assert_eq!(second, NalStart { offset: 6, len: 3 });
215 }
216
217 #[test]
218 fn nal_start_returns_none_without_a_code() {
219 assert!(find_nal_start(&[0x01, 0x02, 0x00, 0x01], 0).is_none());
220 assert!(find_nal_start(&[], 0).is_none());
221 }
222
223 #[test]
224 fn rate_limit_resets_each_window() {
225 let mut rl = IngestRateLimit::new(100);
226 assert!(rl.allow(60));
227 assert!(rl.allow(40)); // exactly at budget
228 assert!(!rl.allow(1)); // over budget within the same second
229 // Force the window boundary and confirm the budget refreshes.
230 rl.window_start = Instant::now() - std::time::Duration::from_secs(2);
231 assert!(rl.allow(100));
232 }
233
234 #[test]
235 fn rate_limit_zero_is_unlimited() {
236 let mut rl = IngestRateLimit::new(0);
237 assert!(rl.allow(usize::MAX));
238 }
239
240 #[test]
241 fn keyframe_gate_holds_deltas_until_idr() {
242 let mut gate = KeyframeGate::new();
243 assert!(!gate.is_open());
244 assert!(!gate.admit(FrameType::Delta));
245 assert!(gate.admit(FrameType::Audio)); // audio bypasses the gate
246 assert!(gate.admit(FrameType::Key)); // opens
247 assert!(gate.is_open());
248 assert!(gate.admit(FrameType::Delta));
249 }
250}