Skip to main content

arcly_stream/
inbound.rs

1//! The **multi-protocol ingestion architecture** — the public seam for teaching
2//! the engine new inbound wire protocols (RTSP, SRT, WebRTC WHIP/WHEP, …)
3//! without touching the kernel.
4//!
5//! # The three pieces
6//!
7//! | Type | Role |
8//! |------|------|
9//! | [`InboundProtocol`] | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. |
10//! | [`IngestContext`] | The ergonomic, cloneable handle a worker uses to reach the engine bus — hands out publish sessions. |
11//! | [`PublishSession`] | An RAII token for one live stream: every frame lands in the GOP cache + live QoS; releasing it frees the publish slot. |
12//!
13//! ```text
14//!   your crate                          arcly-stream kernel
15//!  ┌────────────────────────┐          ┌───────────────────────────────┐
16//!  │ struct MyRtspHandler    │          │ Engine (lock-free bus)        │
17//!  │ impl InboundProtocol {  │  serve   │  • broadcast fan-out          │
18//!  │   async fn serve(ctx) { │◀─────────│  • GOP cache (instant start)  │
19//!  │     ctx.open_publish()──┼────────▶ │  • live QoS counters          │
20//!  │       .publish_frame()  │ frames   │  PublishRegistry              │
21//!  │ } }                     │          │                               │
22//!  └────────────────────────┘          └───────────────────────────────┘
23//! ```
24//!
25//! # Design pattern: a worker owns its transport
26//!
27//! [`InboundProtocol::serve`] is intentionally a *single, long-lived call* that
28//! owns the listener for the protocol's whole lifetime, rather than a set of
29//! per-connection lifecycle callbacks. This keeps the contract minimal and lets
30//! each protocol model its own connection state machine (RTMP chunk streams,
31//! RTSP sessions, SRT handshakes) however it needs — the kernel never assumes a
32//! shape. The reusable [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server)
33//! accept-loop (feature `ingest`) covers the common TCP case so most workers are
34//! a thin per-connection handler over it.
35//!
36//! # Thread-safety & runtime requirements
37//!
38//! - A worker is `Send + Sync + 'static`: the engine shares one instance across
39//!   tasks and may run it for the entire process lifetime.
40//! - [`serve`](InboundProtocol::serve) must return promptly once `shutdown` is
41//!   cancelled — the engine's coordinated teardown waits on every worker.
42//! - [`PublishSession`] is **not** `Clone`: it models exclusive ownership of one
43//!   publish slot. Frames may be published from any task that holds it (or holds
44//!   its [`StreamHandle`](PublishSession::handle)).
45//!
46//! # Minimal worker
47//!
48//! ```no_run
49//! use arcly_stream::prelude::*;
50//! use arcly_stream::inbound::{InboundProtocol, IngestContext};
51//! use arcly_stream::bytes::Bytes;
52//!
53//! struct LoopbackProtocol;
54//!
55//! #[async_trait]
56//! impl InboundProtocol for LoopbackProtocol {
57//!     fn name(&self) -> &'static str { "loopback" }
58//!
59//!     async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
60//!         // Claim a stream; the session releases it on drop.
61//!         let session = ctx.open_publish(StreamKey::new("live", "demo")).await?;
62//!         let kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"idr"), CodecId::H264, true);
63//!         session.publish_frame(kf)?;
64//!         shutdown.cancelled().await; // run until told to stop
65//!         session.finish().await
66//!     }
67//! }
68//! ```
69
70use crate::bus::{PublishRegistry, StreamHandle};
71use crate::{MediaFrame, Result, StreamKey};
72use async_trait::async_trait;
73use std::sync::Arc;
74use tokio_util::sync::CancellationToken;
75
76/// A pluggable inbound wire-protocol worker — the unit the engine runs to ingest
77/// a transport (RTMP, RTSP, SRT, WHIP, …).
78///
79/// Implement this in your own crate and register it with
80/// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) (or pass it to
81/// [`Engine::serve`](crate::Engine::serve)); the engine never needs to know the
82/// concrete type. A worker:
83///
84/// 1. **binds** its listener(s) inside [`serve`](Self::serve),
85/// 2. **accepts** connections and performs each protocol **handshake**,
86/// 3. resolves a [`StreamKey`] and opens a [`PublishSession`] via the
87///    [`IngestContext`],
88/// 4. **bridges** decoded access units to [`MediaFrame`]s and publishes them,
89/// 5. **tears down** cleanly when a connection closes or `shutdown` fires.
90///
91/// Any type implementing the legacy [`ProtocolHandler`](crate::ProtocolHandler)
92/// is automatically an `InboundProtocol` via a blanket bridge, so existing
93/// handlers keep working unchanged.
94#[async_trait]
95pub trait InboundProtocol: Send + Sync + 'static {
96    /// Stable, human-readable protocol name (`"rtmp"`, `"rtsp"`, …). Used in logs
97    /// and the engine's per-worker lifecycle tracing.
98    fn name(&self) -> &'static str;
99
100    /// Run the protocol's listener until `shutdown` is cancelled.
101    ///
102    /// Return `Ok(())` on a clean shutdown. Returning `Err` signals a fatal fault
103    /// (e.g. the listener could not bind) and trips the engine's coordinated
104    /// teardown, winding down sibling workers too.
105    ///
106    /// Implementations **must** observe `shutdown` and return promptly once it is
107    /// cancelled; the engine awaits every worker during drain.
108    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
109}
110
111/// Blanket bridge: every legacy [`ProtocolHandler`](crate::ProtocolHandler) is an
112/// [`InboundProtocol`]. New protocols should implement `InboundProtocol` directly
113/// for the ergonomic [`IngestContext`]; this keeps pre-existing handlers working.
114#[async_trait]
115impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
116    fn name(&self) -> &'static str {
117        crate::traits::ProtocolHandler::name(self)
118    }
119
120    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
121        crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
122    }
123}
124
125/// The ergonomic, cloneable handle a protocol worker uses to reach the engine
126/// bus, handed to every [`InboundProtocol::serve`] call.
127///
128/// It hides the [`PublishRegistry`] trait object behind a small, stable surface:
129/// claim a stream for publishing ([`open_publish`](Self::open_publish)), or reach
130/// the underlying registry for advanced flows ([`registry`](Self::registry)).
131/// Cloning is cheap (an `Arc` bump) — share it freely across per-connection tasks.
132#[derive(Clone)]
133pub struct IngestContext {
134    registry: Arc<dyn PublishRegistry>,
135}
136
137impl IngestContext {
138    /// Wrap a publish registry. The engine constructs this for you; tests and
139    /// embedders can build one directly from any [`PublishRegistry`] (e.g. an
140    /// `Arc<Engine>`).
141    pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
142        Self { registry }
143    }
144
145    /// Claim `key` for publishing, returning an RAII [`PublishSession`].
146    ///
147    /// Frames published through the returned session flow into the lock-free
148    /// broadcast fan-out, the keyframe-anchored GOP cache (for instant-start
149    /// replay), and the live QoS counters — exactly as a native handler's frames
150    /// do. Fails with [`StreamAlreadyPublishing`] on a live duplicate or
151    /// [`PublisherLimitReached`] at capacity.
152    ///
153    /// [`StreamAlreadyPublishing`]: crate::StreamError::StreamAlreadyPublishing
154    /// [`PublisherLimitReached`]: crate::StreamError::PublisherLimitReached
155    pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
156        let handle = self.registry.start_publish(&key).await?;
157        Ok(PublishSession {
158            handle,
159            registry: Arc::clone(&self.registry),
160            key,
161            released: false,
162        })
163    }
164
165    /// The underlying publish registry, for flows not covered by
166    /// [`open_publish`](Self::open_publish).
167    pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
168        &self.registry
169    }
170}
171
172/// An RAII publish session — a protocol worker's exclusive token for one live
173/// stream.
174///
175/// Wraps the engine's [`StreamHandle`]: every [`publish_frame`](Self::publish_frame)
176/// lands in the broadcast fan-out, GOP cache, and live QoS counters. Dropping the
177/// session releases the publish slot (best-effort, on the current Tokio runtime),
178/// so a worker that returns early or panics never leaks a stream. Prefer
179/// [`finish`](Self::finish) for deterministic, awaited teardown.
180pub struct PublishSession {
181    handle: StreamHandle,
182    registry: Arc<dyn PublishRegistry>,
183    key: StreamKey,
184    released: bool,
185}
186
187impl PublishSession {
188    /// The stream key this session publishes to.
189    pub fn key(&self) -> &StreamKey {
190        &self.key
191    }
192
193    /// The underlying engine [`StreamHandle`] — GOP cache, QoS, subscriber count,
194    /// metadata, and direct `subscribe`/replay access.
195    pub fn handle(&self) -> &StreamHandle {
196        &self.handle
197    }
198
199    /// Publish one decoded frame to all subscribers; returns the live subscriber
200    /// count (`0` when nobody is watching yet).
201    pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
202        self.handle.publish_frame(frame)
203    }
204
205    /// Release the publish slot deterministically. Prefer this over relying on
206    /// [`Drop`] whenever you can `await` — it surfaces the teardown error and
207    /// completes before the next stream can reuse the key.
208    pub async fn finish(mut self) -> Result<()> {
209        self.released = true;
210        self.registry.end_publish(&self.key).await
211    }
212}
213
214impl Drop for PublishSession {
215    fn drop(&mut self) {
216        if self.released {
217            return;
218        }
219        // Best-effort async release. `end_publish` is async; if a Tokio runtime is
220        // available (the normal case inside a worker), spawn the teardown so the
221        // slot is freed even on an early return or panic.
222        if let Ok(rt) = tokio::runtime::Handle::try_current() {
223            let registry = Arc::clone(&self.registry);
224            let key = self.key.clone();
225            rt.spawn(async move {
226                let _ = registry.end_publish(&key).await;
227            });
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::bus::PlaybackRegistry;
236    use crate::{AppSpec, CodecId, Engine, FrameFlags};
237    use bytes::Bytes;
238
239    /// A tiny custom protocol that publishes a config + keyframe then idles until
240    /// shutdown — exercising the full `InboundProtocol` → `IngestContext` →
241    /// `PublishSession` path against a real engine.
242    struct DemoProtocol {
243        key: StreamKey,
244    }
245
246    #[async_trait]
247    impl InboundProtocol for DemoProtocol {
248        fn name(&self) -> &'static str {
249            "demo"
250        }
251
252        async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
253            let session = ctx.open_publish(self.key.clone()).await?;
254            let mut cfg =
255                MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
256            cfg.flags |= FrameFlags::CONFIG;
257            session.publish_frame(cfg)?;
258            session.publish_frame(MediaFrame::new_video(
259                0,
260                0,
261                Bytes::from_static(b"idr"),
262                CodecId::H264,
263                true,
264            ))?;
265            shutdown.cancelled().await;
266            session.finish().await
267        }
268    }
269
270    #[tokio::test]
271    async fn custom_protocol_publishes_through_ingest_context() {
272        let engine = Engine::builder()
273            .application(AppSpec::new("live").gop_cache(8))
274            .build();
275        let key = StreamKey::new("live", "cam");
276        let ctx = IngestContext::new(engine.clone());
277
278        let proto = DemoProtocol { key: key.clone() };
279        let shutdown = CancellationToken::new();
280
281        // Run the worker; cancel once it has had a chance to publish.
282        let worker = {
283            let shutdown = shutdown.clone();
284            tokio::spawn(async move { proto.serve(ctx, shutdown).await })
285        };
286
287        // Wait until the stream is live and its GOP cache holds the keyframe.
288        let handle = loop {
289            if let Ok(h) = engine.get_stream(&key) {
290                if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
291                    break h;
292                }
293            }
294            tokio::task::yield_now().await;
295        };
296        let (vcfg, _) = handle.cached_configs();
297        assert!(vcfg.is_some(), "config frame cached via PublishSession");
298
299        shutdown.cancel();
300        worker.await.unwrap().unwrap();
301        // After finish(), the publish slot is released.
302        assert!(
303            engine.get_stream(&key).is_err(),
304            "session released on finish"
305        );
306    }
307
308    #[tokio::test]
309    async fn dropping_session_releases_the_slot() {
310        let engine = Engine::builder().application(AppSpec::new("live")).build();
311        let key = StreamKey::new("live", "drop-test");
312        let ctx = IngestContext::new(engine.clone());
313
314        {
315            let _session = ctx.open_publish(key.clone()).await.unwrap();
316            assert!(engine.get_stream(&key).is_ok(), "stream live while held");
317        } // dropped here → best-effort async end_publish spawned
318
319        // Yield so the spawned teardown runs.
320        for _ in 0..16 {
321            if engine.get_stream(&key).is_err() {
322                break;
323            }
324            tokio::task::yield_now().await;
325        }
326        assert!(engine.get_stream(&key).is_err(), "slot released on drop");
327    }
328}