Skip to main content

arcly_stream/
inbound.rs

1//! The **multi-protocol ingestion architecture** — the public seam for teaching
2//! the engine new inbound wire protocols (RTSP, SRT, WebRTC WHIP/WHEP, …)
3//! without touching the kernel.
4//!
5//! # The three pieces
6//!
7//! | Type | Role |
8//! |------|------|
9//! | [`InboundProtocol`] | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. |
10//! | [`IngestContext`] | The ergonomic, cloneable handle a worker uses to reach the engine bus — hands out publish sessions. |
11//! | [`PublishSession`] | An RAII token for one live stream: every frame lands in the GOP cache + live QoS; releasing it frees the publish slot. |
12//!
13//! ```text
14//!   your crate                          arcly-stream kernel
15//!  ┌────────────────────────┐          ┌───────────────────────────────┐
16//!  │ struct MyRtspHandler    │          │ Engine (lock-free bus)        │
17//!  │ impl InboundProtocol {  │  serve   │  • broadcast fan-out          │
18//!  │   async fn serve(ctx) { │◀─────────│  • GOP cache (instant start)  │
19//!  │     ctx.open_publish()──┼────────▶ │  • live QoS counters          │
20//!  │       .publish_frame()  │ frames   │  PublishRegistry              │
21//!  │ } }                     │          │                               │
22//!  └────────────────────────┘          └───────────────────────────────┘
23//! ```
24//!
25//! # Design pattern: a worker owns its transport
26//!
27//! [`InboundProtocol::serve`] is intentionally a *single, long-lived call* that
28//! owns the listener for the protocol's whole lifetime, rather than a set of
29//! per-connection lifecycle callbacks. This keeps the contract minimal and lets
30//! each protocol model its own connection state machine (RTMP chunk streams,
31//! RTSP sessions, SRT handshakes) however it needs — the kernel never assumes a
32//! shape. The reusable [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server)
33//! accept-loop (feature `ingest`) covers the common TCP case so most workers are
34//! a thin per-connection handler over it.
35//!
36//! # Thread-safety & runtime requirements
37//!
38//! - A worker is `Send + Sync + 'static`: the engine shares one instance across
39//!   tasks and may run it for the entire process lifetime.
40//! - [`serve`](InboundProtocol::serve) must return promptly once `shutdown` is
41//!   cancelled — the engine's coordinated teardown waits on every worker.
42//! - [`PublishSession`] is **not** `Clone`: it models exclusive ownership of one
43//!   publish slot. Frames may be published from any task that holds it (or holds
44//!   its [`StreamHandle`](PublishSession::handle)).
45//!
46//! # Graceful teardown
47//!
48//! When an ingest connection drops, ending its [`PublishSession`] (via
49//! [`finish`](PublishSession::finish), or best-effort on `Drop`) frees the
50//! publish slot *and* closes the stream's lock-free broadcast bus. Active
51//! playback subscribers are therefore **notified seamlessly** — a resilient
52//! subscriber's `recv` yields `None` — with no handler reaching across to each
53//! one. A host that drives a recorder ([`RecordingSink`](crate::record::RecordingSink))
54//! off the same subscription flushes it on that signal, so recordings land in
55//! the [`StorageBackend`](crate::traits::StorageBackend) on disconnect. The
56//! engine's [idle reaper](crate::Engine::reap_idle) provides the same teardown
57//! for connections that wedge without closing cleanly.
58//!
59//! # Minimal worker
60//!
61//! ```no_run
62//! use arcly_stream::prelude::*;
63//! use arcly_stream::inbound::{InboundProtocol, IngestContext};
64//! use arcly_stream::bytes::Bytes;
65//!
66//! struct LoopbackProtocol;
67//!
68//! #[async_trait]
69//! impl InboundProtocol for LoopbackProtocol {
70//!     fn name(&self) -> &'static str { "loopback" }
71//!
72//!     async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
73//!         // Claim a stream; the session releases it on drop.
74//!         let session = ctx.open_publish(StreamKey::new("live", "demo")).await?;
75//!         let kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"idr"), CodecId::H264, true);
76//!         session.publish_frame(kf)?;
77//!         shutdown.cancelled().await; // run until told to stop
78//!         session.finish().await
79//!     }
80//! }
81//! ```
82
83use crate::bus::{PublishRegistry, StreamHandle};
84use crate::{MediaFrame, Result, StreamKey};
85use async_trait::async_trait;
86use std::sync::Arc;
87use tokio_util::sync::CancellationToken;
88
89/// A pluggable inbound wire-protocol worker — the unit the engine runs to ingest
90/// a transport (RTMP, RTSP, SRT, WHIP, …).
91///
92/// Implement this in your own crate and register it with
93/// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) (or pass it to
94/// [`Engine::serve`](crate::Engine::serve)); the engine never needs to know the
95/// concrete type. A worker:
96///
97/// 1. **binds** its listener(s) inside [`serve`](Self::serve),
98/// 2. **accepts** connections and performs each protocol **handshake**,
99/// 3. resolves a [`StreamKey`] and opens a [`PublishSession`] via the
100///    [`IngestContext`],
101/// 4. **bridges** decoded access units to [`MediaFrame`]s and publishes them,
102/// 5. **tears down** cleanly when a connection closes or `shutdown` fires.
103///
104/// Any type implementing the legacy [`ProtocolHandler`](crate::ProtocolHandler)
105/// is automatically an `InboundProtocol` via a blanket bridge, so existing
106/// handlers keep working unchanged.
107#[async_trait]
108pub trait InboundProtocol: Send + Sync + 'static {
109    /// Stable, human-readable protocol name (`"rtmp"`, `"rtsp"`, …). Used in logs
110    /// and the engine's per-worker lifecycle tracing.
111    fn name(&self) -> &'static str;
112
113    /// Run the protocol's listener until `shutdown` is cancelled.
114    ///
115    /// Return `Ok(())` on a clean shutdown. Returning `Err` signals a fatal fault
116    /// (e.g. the listener could not bind) and trips the engine's coordinated
117    /// teardown, winding down sibling workers too.
118    ///
119    /// Implementations **must** observe `shutdown` and return promptly once it is
120    /// cancelled; the engine awaits every worker during drain.
121    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
122}
123
124/// Blanket bridge: every legacy [`ProtocolHandler`](crate::ProtocolHandler) is an
125/// [`InboundProtocol`]. New protocols should implement `InboundProtocol` directly
126/// for the ergonomic [`IngestContext`]; this keeps pre-existing handlers working.
127#[async_trait]
128impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
129    fn name(&self) -> &'static str {
130        crate::traits::ProtocolHandler::name(self)
131    }
132
133    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
134        crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
135    }
136}
137
138/// The ergonomic, cloneable handle a protocol worker uses to reach the engine
139/// bus, handed to every [`InboundProtocol::serve`] call.
140///
141/// It hides the [`PublishRegistry`] trait object behind a small, stable surface:
142/// claim a stream for publishing ([`open_publish`](Self::open_publish)), or reach
143/// the underlying registry for advanced flows ([`registry`](Self::registry)).
144/// Cloning is cheap (an `Arc` bump) — share it freely across per-connection tasks.
145#[derive(Clone)]
146pub struct IngestContext {
147    registry: Arc<dyn PublishRegistry>,
148}
149
150impl IngestContext {
151    /// Wrap a publish registry. The engine constructs this for you; tests and
152    /// embedders can build one directly from any [`PublishRegistry`] (e.g. an
153    /// `Arc<Engine>`).
154    pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
155        Self { registry }
156    }
157
158    /// Claim `key` for publishing, returning an RAII [`PublishSession`].
159    ///
160    /// Frames published through the returned session flow into the lock-free
161    /// broadcast fan-out, the keyframe-anchored GOP cache (for instant-start
162    /// replay), and the live QoS counters — exactly as a native handler's frames
163    /// do. Fails with [`StreamAlreadyPublishing`] on a live duplicate or
164    /// [`PublisherLimitReached`] at capacity.
165    ///
166    /// [`StreamAlreadyPublishing`]: crate::StreamError::StreamAlreadyPublishing
167    /// [`PublisherLimitReached`]: crate::StreamError::PublisherLimitReached
168    pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
169        let handle = self.registry.start_publish(&key).await?;
170        Ok(PublishSession {
171            handle,
172            registry: Arc::clone(&self.registry),
173            key,
174            released: false,
175        })
176    }
177
178    /// The underlying publish registry, for flows not covered by
179    /// [`open_publish`](Self::open_publish).
180    pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
181        &self.registry
182    }
183}
184
185/// An RAII publish session — a protocol worker's exclusive token for one live
186/// stream.
187///
188/// Wraps the engine's [`StreamHandle`]: every [`publish_frame`](Self::publish_frame)
189/// lands in the broadcast fan-out, GOP cache, and live QoS counters. Dropping the
190/// session releases the publish slot (best-effort, on the current Tokio runtime),
191/// so a worker that returns early or panics never leaks a stream. Prefer
192/// [`finish`](Self::finish) for deterministic, awaited teardown.
193pub struct PublishSession {
194    handle: StreamHandle,
195    registry: Arc<dyn PublishRegistry>,
196    key: StreamKey,
197    released: bool,
198}
199
200impl PublishSession {
201    /// The stream key this session publishes to.
202    pub fn key(&self) -> &StreamKey {
203        &self.key
204    }
205
206    /// The underlying engine [`StreamHandle`] — GOP cache, QoS, subscriber count,
207    /// metadata, and direct `subscribe`/replay access.
208    pub fn handle(&self) -> &StreamHandle {
209        &self.handle
210    }
211
212    /// Publish one decoded frame to all subscribers; returns the live subscriber
213    /// count (`0` when nobody is watching yet).
214    pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
215        self.handle.publish_frame(frame)
216    }
217
218    /// Release the publish slot deterministically. Prefer this over relying on
219    /// [`Drop`] whenever you can `await` — it surfaces the teardown error and
220    /// completes before the next stream can reuse the key.
221    pub async fn finish(mut self) -> Result<()> {
222        self.released = true;
223        self.registry.end_publish(&self.key).await
224    }
225}
226
227impl Drop for PublishSession {
228    fn drop(&mut self) {
229        if self.released {
230            return;
231        }
232        // Best-effort async release. `end_publish` is async; if a Tokio runtime is
233        // available (the normal case inside a worker), spawn the teardown so the
234        // slot is freed even on an early return or panic.
235        if let Ok(rt) = tokio::runtime::Handle::try_current() {
236            let registry = Arc::clone(&self.registry);
237            let key = self.key.clone();
238            rt.spawn(async move {
239                let _ = registry.end_publish(&key).await;
240            });
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::bus::PlaybackRegistry;
249    use crate::{AppSpec, CodecId, Engine, FrameFlags};
250    use bytes::Bytes;
251
252    /// A tiny custom protocol that publishes a config + keyframe then idles until
253    /// shutdown — exercising the full `InboundProtocol` → `IngestContext` →
254    /// `PublishSession` path against a real engine.
255    struct DemoProtocol {
256        key: StreamKey,
257    }
258
259    #[async_trait]
260    impl InboundProtocol for DemoProtocol {
261        fn name(&self) -> &'static str {
262            "demo"
263        }
264
265        async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
266            let session = ctx.open_publish(self.key.clone()).await?;
267            let mut cfg =
268                MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
269            cfg.flags |= FrameFlags::CONFIG;
270            session.publish_frame(cfg)?;
271            session.publish_frame(MediaFrame::new_video(
272                0,
273                0,
274                Bytes::from_static(b"idr"),
275                CodecId::H264,
276                true,
277            ))?;
278            shutdown.cancelled().await;
279            session.finish().await
280        }
281    }
282
283    #[tokio::test]
284    async fn custom_protocol_publishes_through_ingest_context() {
285        let engine = Engine::builder()
286            .application(AppSpec::new("live").gop_cache(8))
287            .build();
288        let key = StreamKey::new("live", "cam");
289        let ctx = IngestContext::new(engine.clone());
290
291        let proto = DemoProtocol { key: key.clone() };
292        let shutdown = CancellationToken::new();
293
294        // Run the worker; cancel once it has had a chance to publish.
295        let worker = {
296            let shutdown = shutdown.clone();
297            tokio::spawn(async move { proto.serve(ctx, shutdown).await })
298        };
299
300        // Wait until the stream is live and its GOP cache holds the keyframe.
301        let handle = loop {
302            if let Ok(h) = engine.get_stream(&key) {
303                if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
304                    break h;
305                }
306            }
307            tokio::task::yield_now().await;
308        };
309        let (vcfg, _) = handle.cached_configs();
310        assert!(vcfg.is_some(), "config frame cached via PublishSession");
311
312        shutdown.cancel();
313        worker.await.unwrap().unwrap();
314        // After finish(), the publish slot is released.
315        assert!(
316            engine.get_stream(&key).is_err(),
317            "session released on finish"
318        );
319    }
320
321    #[tokio::test]
322    async fn dropping_session_releases_the_slot() {
323        let engine = Engine::builder().application(AppSpec::new("live")).build();
324        let key = StreamKey::new("live", "drop-test");
325        let ctx = IngestContext::new(engine.clone());
326
327        {
328            let _session = ctx.open_publish(key.clone()).await.unwrap();
329            assert!(engine.get_stream(&key).is_ok(), "stream live while held");
330        } // dropped here → best-effort async end_publish spawned
331
332        // Yield so the spawned teardown runs.
333        for _ in 0..16 {
334            if engine.get_stream(&key).is_err() {
335                break;
336            }
337            tokio::task::yield_now().await;
338        }
339        assert!(engine.get_stream(&key).is_err(), "slot released on drop");
340    }
341
342    #[tokio::test]
343    async fn finishing_a_session_notifies_subscribers_via_the_bus() {
344        // Graceful teardown: ending a publish closes the lock-free bus, so an
345        // active playback subscriber is seamlessly notified (its stream ends)
346        // without the protocol handler reaching across to each subscriber.
347        let engine = Engine::builder()
348            .application(AppSpec::new("live").gop_cache(4))
349            .build();
350        let key = StreamKey::new("live", "cam");
351        let ctx = IngestContext::new(engine.clone());
352
353        let session = ctx.open_publish(key.clone()).await.unwrap();
354        let handle = engine.get_stream(&key).unwrap();
355        let mut sub = handle.subscribe_resilient();
356
357        // The subscriber sees the live frame.
358        session
359            .publish_frame(MediaFrame::new_video(
360                0,
361                0,
362                Bytes::from_static(b"idr"),
363                CodecId::H264,
364                true,
365            ))
366            .unwrap();
367        assert!(sub.recv().await.is_some(), "subscriber receives live frame");
368
369        // Release every handle clone, then tear the session down.
370        drop(handle);
371        session.finish().await.unwrap();
372
373        // The subscriber is notified of teardown: the closed bus yields `None`.
374        assert!(
375            sub.recv().await.is_none(),
376            "subscriber notified that the stream ended"
377        );
378        assert!(engine.get_stream(&key).is_err(), "slot released on finish");
379    }
380}