arcly_stream/inbound.rs
1//! The **multi-protocol ingestion architecture** — the public seam for teaching
2//! the engine new inbound wire protocols (RTSP, SRT, WebRTC WHIP/WHEP, …)
3//! without touching the kernel.
4//!
5//! # The three pieces
6//!
7//! | Type | Role |
8//! |------|------|
9//! | [`InboundProtocol`] | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. |
10//! | [`IngestContext`] | The ergonomic, cloneable handle a worker uses to reach the engine bus — hands out publish sessions. |
11//! | [`PublishSession`] | An RAII token for one live stream: every frame lands in the GOP cache + live QoS; releasing it frees the publish slot. |
12//!
13//! ```text
14//! your crate arcly-stream kernel
15//! ┌────────────────────────┐ ┌───────────────────────────────┐
16//! │ struct MyRtspHandler │ │ Engine (lock-free bus) │
17//! │ impl InboundProtocol { │ serve │ • broadcast fan-out │
18//! │ async fn serve(ctx) { │◀─────────│ • GOP cache (instant start) │
19//! │ ctx.open_publish()──┼────────▶ │ • live QoS counters │
20//! │ .publish_frame() │ frames │ PublishRegistry │
21//! │ } } │ │ │
22//! └────────────────────────┘ └───────────────────────────────┘
23//! ```
24//!
25//! # Design pattern: a worker owns its transport
26//!
27//! [`InboundProtocol::serve`] is intentionally a *single, long-lived call* that
28//! owns the listener for the protocol's whole lifetime, rather than a set of
29//! per-connection lifecycle callbacks. This keeps the contract minimal and lets
30//! each protocol model its own connection state machine (RTMP chunk streams,
31//! RTSP sessions, SRT handshakes) however it needs — the kernel never assumes a
32//! shape. The reusable [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server)
33//! accept-loop (feature `ingest`) covers the common TCP case so most workers are
34//! a thin per-connection handler over it.
35//!
36//! # Thread-safety & runtime requirements
37//!
38//! - A worker is `Send + Sync + 'static`: the engine shares one instance across
39//! tasks and may run it for the entire process lifetime.
40//! - [`serve`](InboundProtocol::serve) must return promptly once `shutdown` is
41//! cancelled — the engine's coordinated teardown waits on every worker.
42//! - [`PublishSession`] is **not** `Clone`: it models exclusive ownership of one
43//! publish slot. Frames may be published from any task that holds it (or holds
44//! its [`StreamHandle`](PublishSession::handle)).
45//!
46//! # Graceful teardown
47//!
48//! When an ingest connection drops, ending its [`PublishSession`] (via
49//! [`finish`](PublishSession::finish), or best-effort on `Drop`) frees the
50//! publish slot *and* closes the stream's lock-free broadcast bus. Active
51//! playback subscribers are therefore **notified seamlessly** — a resilient
52//! subscriber's `recv` yields `None` — with no handler reaching across to each
53//! one. A host that drives a recorder ([`RecordingSink`](crate::record::RecordingSink))
54//! off the same subscription flushes it on that signal, so recordings land in
55//! the [`StorageBackend`](crate::traits::StorageBackend) on disconnect. The
56//! engine's [idle reaper](crate::Engine::reap_idle) provides the same teardown
57//! for connections that wedge without closing cleanly.
58//!
59//! # Minimal worker
60//!
61//! ```no_run
62//! use arcly_stream::prelude::*;
63//! use arcly_stream::inbound::{InboundProtocol, IngestContext};
64//! use arcly_stream::bytes::Bytes;
65//!
66//! struct LoopbackProtocol;
67//!
68//! #[async_trait]
69//! impl InboundProtocol for LoopbackProtocol {
70//! fn name(&self) -> &'static str { "loopback" }
71//!
72//! async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
73//! // Claim a stream; the session releases it on drop.
74//! let session = ctx.open_publish(StreamKey::new("live", "demo")).await?;
75//! let kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"idr"), CodecId::H264, true);
76//! session.publish_frame(kf)?;
77//! shutdown.cancelled().await; // run until told to stop
78//! session.finish().await
79//! }
80//! }
81//! ```
82
83use crate::bus::{PublishRegistry, StreamHandle};
84use crate::{MediaFrame, Result, StreamKey};
85use async_trait::async_trait;
86use std::sync::Arc;
87use tokio_util::sync::CancellationToken;
88
89/// A pluggable inbound wire-protocol worker — the unit the engine runs to ingest
90/// a transport (RTMP, RTSP, SRT, WHIP, …).
91///
92/// Implement this in your own crate and register it with
93/// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) (or pass it to
94/// [`Engine::serve`](crate::Engine::serve)); the engine never needs to know the
95/// concrete type. A worker:
96///
97/// 1. **binds** its listener(s) inside [`serve`](Self::serve),
98/// 2. **accepts** connections and performs each protocol **handshake**,
99/// 3. resolves a [`StreamKey`] and opens a [`PublishSession`] via the
100/// [`IngestContext`],
101/// 4. **bridges** decoded access units to [`MediaFrame`]s and publishes them,
102/// 5. **tears down** cleanly when a connection closes or `shutdown` fires.
103///
104/// Any type implementing the legacy [`ProtocolHandler`](crate::ProtocolHandler)
105/// is automatically an `InboundProtocol` via a blanket bridge, so existing
106/// handlers keep working unchanged.
107#[async_trait]
108pub trait InboundProtocol: Send + Sync + 'static {
109 /// Stable, human-readable protocol name (`"rtmp"`, `"rtsp"`, …). Used in logs
110 /// and the engine's per-worker lifecycle tracing.
111 fn name(&self) -> &'static str;
112
113 /// Run the protocol's listener until `shutdown` is cancelled.
114 ///
115 /// Return `Ok(())` on a clean shutdown. Returning `Err` signals a fatal fault
116 /// (e.g. the listener could not bind) and trips the engine's coordinated
117 /// teardown, winding down sibling workers too.
118 ///
119 /// Implementations **must** observe `shutdown` and return promptly once it is
120 /// cancelled; the engine awaits every worker during drain.
121 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
122}
123
124/// Blanket bridge: every legacy [`ProtocolHandler`](crate::ProtocolHandler) is an
125/// [`InboundProtocol`]. New protocols should implement `InboundProtocol` directly
126/// for the ergonomic [`IngestContext`]; this keeps pre-existing handlers working.
127#[async_trait]
128impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
129 fn name(&self) -> &'static str {
130 crate::traits::ProtocolHandler::name(self)
131 }
132
133 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
134 crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
135 }
136}
137
138/// The ergonomic, cloneable handle a protocol worker uses to reach the engine
139/// bus, handed to every [`InboundProtocol::serve`] call.
140///
141/// It hides the [`PublishRegistry`] trait object behind a small, stable surface:
142/// claim a stream for publishing ([`open_publish`](Self::open_publish)), or reach
143/// the underlying registry for advanced flows ([`registry`](Self::registry)).
144/// Cloning is cheap (an `Arc` bump) — share it freely across per-connection tasks.
145#[derive(Clone)]
146pub struct IngestContext {
147 registry: Arc<dyn PublishRegistry>,
148}
149
150impl IngestContext {
151 /// Wrap a publish registry. The engine constructs this for you; tests and
152 /// embedders can build one directly from any [`PublishRegistry`] (e.g. an
153 /// `Arc<Engine>`).
154 pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
155 Self { registry }
156 }
157
158 /// Claim `key` for publishing, returning an RAII [`PublishSession`].
159 ///
160 /// Frames published through the returned session flow into the lock-free
161 /// broadcast fan-out, the keyframe-anchored GOP cache (for instant-start
162 /// replay), and the live QoS counters — exactly as a native handler's frames
163 /// do. Fails with [`StreamAlreadyPublishing`] on a live duplicate or
164 /// [`PublisherLimitReached`] at capacity.
165 ///
166 /// [`StreamAlreadyPublishing`]: crate::StreamError::StreamAlreadyPublishing
167 /// [`PublisherLimitReached`]: crate::StreamError::PublisherLimitReached
168 pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
169 let handle = self.registry.start_publish(&key).await?;
170 Ok(PublishSession {
171 handle,
172 registry: Arc::clone(&self.registry),
173 key,
174 released: false,
175 })
176 }
177
178 /// The underlying publish registry, for flows not covered by
179 /// [`open_publish`](Self::open_publish).
180 pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
181 &self.registry
182 }
183}
184
185/// An RAII publish session — a protocol worker's exclusive token for one live
186/// stream.
187///
188/// Wraps the engine's [`StreamHandle`]: every [`publish_frame`](Self::publish_frame)
189/// lands in the broadcast fan-out, GOP cache, and live QoS counters. Dropping the
190/// session releases the publish slot (best-effort, on the current Tokio runtime),
191/// so a worker that returns early or panics never leaks a stream. Prefer
192/// [`finish`](Self::finish) for deterministic, awaited teardown.
193pub struct PublishSession {
194 handle: StreamHandle,
195 registry: Arc<dyn PublishRegistry>,
196 key: StreamKey,
197 released: bool,
198}
199
200impl PublishSession {
201 /// The stream key this session publishes to.
202 pub fn key(&self) -> &StreamKey {
203 &self.key
204 }
205
206 /// The underlying engine [`StreamHandle`] — GOP cache, QoS, subscriber count,
207 /// metadata, and direct `subscribe`/replay access.
208 pub fn handle(&self) -> &StreamHandle {
209 &self.handle
210 }
211
212 /// Publish one decoded frame to all subscribers; returns the live subscriber
213 /// count (`0` when nobody is watching yet).
214 pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
215 self.handle.publish_frame(frame)
216 }
217
218 /// Release the publish slot deterministically. Prefer this over relying on
219 /// [`Drop`] whenever you can `await` — it surfaces the teardown error and
220 /// completes before the next stream can reuse the key.
221 pub async fn finish(mut self) -> Result<()> {
222 self.released = true;
223 self.registry.end_publish(&self.key).await
224 }
225}
226
227impl Drop for PublishSession {
228 fn drop(&mut self) {
229 if self.released {
230 return;
231 }
232 // Best-effort async release. `end_publish` is async; if a Tokio runtime is
233 // available (the normal case inside a worker), spawn the teardown so the
234 // slot is freed even on an early return or panic.
235 if let Ok(rt) = tokio::runtime::Handle::try_current() {
236 let registry = Arc::clone(&self.registry);
237 let key = self.key.clone();
238 rt.spawn(async move {
239 let _ = registry.end_publish(&key).await;
240 });
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::bus::PlaybackRegistry;
249 use crate::{AppSpec, CodecId, Engine, FrameFlags};
250 use bytes::Bytes;
251
252 /// A tiny custom protocol that publishes a config + keyframe then idles until
253 /// shutdown — exercising the full `InboundProtocol` → `IngestContext` →
254 /// `PublishSession` path against a real engine.
255 struct DemoProtocol {
256 key: StreamKey,
257 }
258
259 #[async_trait]
260 impl InboundProtocol for DemoProtocol {
261 fn name(&self) -> &'static str {
262 "demo"
263 }
264
265 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
266 let session = ctx.open_publish(self.key.clone()).await?;
267 let mut cfg =
268 MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
269 cfg.flags |= FrameFlags::CONFIG;
270 session.publish_frame(cfg)?;
271 session.publish_frame(MediaFrame::new_video(
272 0,
273 0,
274 Bytes::from_static(b"idr"),
275 CodecId::H264,
276 true,
277 ))?;
278 shutdown.cancelled().await;
279 session.finish().await
280 }
281 }
282
283 #[tokio::test]
284 async fn custom_protocol_publishes_through_ingest_context() {
285 let engine = Engine::builder()
286 .application(AppSpec::new("live").gop_cache(8))
287 .build();
288 let key = StreamKey::new("live", "cam");
289 let ctx = IngestContext::new(engine.clone());
290
291 let proto = DemoProtocol { key: key.clone() };
292 let shutdown = CancellationToken::new();
293
294 // Run the worker; cancel once it has had a chance to publish.
295 let worker = {
296 let shutdown = shutdown.clone();
297 tokio::spawn(async move { proto.serve(ctx, shutdown).await })
298 };
299
300 // Wait until the stream is live and its GOP cache holds the keyframe.
301 let handle = loop {
302 if let Ok(h) = engine.get_stream(&key) {
303 if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
304 break h;
305 }
306 }
307 tokio::task::yield_now().await;
308 };
309 let (vcfg, _) = handle.cached_configs();
310 assert!(vcfg.is_some(), "config frame cached via PublishSession");
311
312 shutdown.cancel();
313 worker.await.unwrap().unwrap();
314 // After finish(), the publish slot is released.
315 assert!(
316 engine.get_stream(&key).is_err(),
317 "session released on finish"
318 );
319 }
320
321 #[tokio::test]
322 async fn dropping_session_releases_the_slot() {
323 let engine = Engine::builder().application(AppSpec::new("live")).build();
324 let key = StreamKey::new("live", "drop-test");
325 let ctx = IngestContext::new(engine.clone());
326
327 {
328 let _session = ctx.open_publish(key.clone()).await.unwrap();
329 assert!(engine.get_stream(&key).is_ok(), "stream live while held");
330 } // dropped here → best-effort async end_publish spawned
331
332 // Yield so the spawned teardown runs.
333 for _ in 0..16 {
334 if engine.get_stream(&key).is_err() {
335 break;
336 }
337 tokio::task::yield_now().await;
338 }
339 assert!(engine.get_stream(&key).is_err(), "slot released on drop");
340 }
341
342 #[tokio::test]
343 async fn finishing_a_session_notifies_subscribers_via_the_bus() {
344 // Graceful teardown: ending a publish closes the lock-free bus, so an
345 // active playback subscriber is seamlessly notified (its stream ends)
346 // without the protocol handler reaching across to each subscriber.
347 let engine = Engine::builder()
348 .application(AppSpec::new("live").gop_cache(4))
349 .build();
350 let key = StreamKey::new("live", "cam");
351 let ctx = IngestContext::new(engine.clone());
352
353 let session = ctx.open_publish(key.clone()).await.unwrap();
354 let handle = engine.get_stream(&key).unwrap();
355 let mut sub = handle.subscribe_resilient();
356
357 // The subscriber sees the live frame.
358 session
359 .publish_frame(MediaFrame::new_video(
360 0,
361 0,
362 Bytes::from_static(b"idr"),
363 CodecId::H264,
364 true,
365 ))
366 .unwrap();
367 assert!(sub.recv().await.is_some(), "subscriber receives live frame");
368
369 // Release every handle clone, then tear the session down.
370 drop(handle);
371 session.finish().await.unwrap();
372
373 // The subscriber is notified of teardown: the closed bus yields `None`.
374 assert!(
375 sub.recv().await.is_none(),
376 "subscriber notified that the stream ended"
377 );
378 assert!(engine.get_stream(&key).is_err(), "slot released on finish");
379 }
380}