arcly_stream/inbound.rs
1//! The **multi-protocol ingestion architecture** — the public seam for teaching
2//! the engine new inbound wire protocols (RTSP, SRT, WebRTC WHIP/WHEP, …)
3//! without touching the kernel.
4//!
5//! # The three pieces
6//!
7//! | Type | Role |
8//! |------|------|
9//! | [`InboundProtocol`] | The worker contract you implement: own a listener, accept connections, bridge frames onto the bus. |
10//! | [`IngestContext`] | The ergonomic, cloneable handle a worker uses to reach the engine bus — hands out publish sessions. |
11//! | [`PublishSession`] | An RAII token for one live stream: every frame lands in the GOP cache + live QoS; releasing it frees the publish slot. |
12//!
13//! ```text
14//! your crate arcly-stream kernel
15//! ┌────────────────────────┐ ┌───────────────────────────────┐
16//! │ struct MyRtspHandler │ │ Engine (lock-free bus) │
17//! │ impl InboundProtocol { │ serve │ • broadcast fan-out │
18//! │ async fn serve(ctx) { │◀─────────│ • GOP cache (instant start) │
19//! │ ctx.open_publish()──┼────────▶ │ • live QoS counters │
20//! │ .publish_frame() │ frames │ PublishRegistry │
21//! │ } } │ │ │
22//! └────────────────────────┘ └───────────────────────────────┘
23//! ```
24//!
25//! # Design pattern: a worker owns its transport
26//!
27//! [`InboundProtocol::serve`] is intentionally a *single, long-lived call* that
28//! owns the listener for the protocol's whole lifetime, rather than a set of
29//! per-connection lifecycle callbacks. This keeps the contract minimal and lets
30//! each protocol model its own connection state machine (RTMP chunk streams,
31//! RTSP sessions, SRT handshakes) however it needs — the kernel never assumes a
32//! shape. The reusable [`run_tcp_ingest_server`](crate::protocol::run_tcp_ingest_server)
33//! accept-loop (feature `ingest`) covers the common TCP case so most workers are
34//! a thin per-connection handler over it.
35//!
36//! # Thread-safety & runtime requirements
37//!
38//! - A worker is `Send + Sync + 'static`: the engine shares one instance across
39//! tasks and may run it for the entire process lifetime.
40//! - [`serve`](InboundProtocol::serve) must return promptly once `shutdown` is
41//! cancelled — the engine's coordinated teardown waits on every worker.
42//! - [`PublishSession`] is **not** `Clone`: it models exclusive ownership of one
43//! publish slot. Frames may be published from any task that holds it (or holds
44//! its [`StreamHandle`](PublishSession::handle)).
45//!
46//! # Minimal worker
47//!
48//! ```no_run
49//! use arcly_stream::prelude::*;
50//! use arcly_stream::inbound::{InboundProtocol, IngestContext};
51//! use arcly_stream::bytes::Bytes;
52//!
53//! struct LoopbackProtocol;
54//!
55//! #[async_trait]
56//! impl InboundProtocol for LoopbackProtocol {
57//! fn name(&self) -> &'static str { "loopback" }
58//!
59//! async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
60//! // Claim a stream; the session releases it on drop.
61//! let session = ctx.open_publish(StreamKey::new("live", "demo")).await?;
62//! let kf = MediaFrame::new_video(0, 0, Bytes::from_static(b"idr"), CodecId::H264, true);
63//! session.publish_frame(kf)?;
64//! shutdown.cancelled().await; // run until told to stop
65//! session.finish().await
66//! }
67//! }
68//! ```
69
70use crate::bus::{PublishRegistry, StreamHandle};
71use crate::{MediaFrame, Result, StreamKey};
72use async_trait::async_trait;
73use std::sync::Arc;
74use tokio_util::sync::CancellationToken;
75
76/// A pluggable inbound wire-protocol worker — the unit the engine runs to ingest
77/// a transport (RTMP, RTSP, SRT, WHIP, …).
78///
79/// Implement this in your own crate and register it with
80/// [`EngineBuilder::protocol`](crate::EngineBuilder::protocol) (or pass it to
81/// [`Engine::serve`](crate::Engine::serve)); the engine never needs to know the
82/// concrete type. A worker:
83///
84/// 1. **binds** its listener(s) inside [`serve`](Self::serve),
85/// 2. **accepts** connections and performs each protocol **handshake**,
86/// 3. resolves a [`StreamKey`] and opens a [`PublishSession`] via the
87/// [`IngestContext`],
88/// 4. **bridges** decoded access units to [`MediaFrame`]s and publishes them,
89/// 5. **tears down** cleanly when a connection closes or `shutdown` fires.
90///
91/// Any type implementing the legacy [`ProtocolHandler`](crate::ProtocolHandler)
92/// is automatically an `InboundProtocol` via a blanket bridge, so existing
93/// handlers keep working unchanged.
94#[async_trait]
95pub trait InboundProtocol: Send + Sync + 'static {
96 /// Stable, human-readable protocol name (`"rtmp"`, `"rtsp"`, …). Used in logs
97 /// and the engine's per-worker lifecycle tracing.
98 fn name(&self) -> &'static str;
99
100 /// Run the protocol's listener until `shutdown` is cancelled.
101 ///
102 /// Return `Ok(())` on a clean shutdown. Returning `Err` signals a fatal fault
103 /// (e.g. the listener could not bind) and trips the engine's coordinated
104 /// teardown, winding down sibling workers too.
105 ///
106 /// Implementations **must** observe `shutdown` and return promptly once it is
107 /// cancelled; the engine awaits every worker during drain.
108 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()>;
109}
110
111/// Blanket bridge: every legacy [`ProtocolHandler`](crate::ProtocolHandler) is an
112/// [`InboundProtocol`]. New protocols should implement `InboundProtocol` directly
113/// for the ergonomic [`IngestContext`]; this keeps pre-existing handlers working.
114#[async_trait]
115impl<T: crate::traits::ProtocolHandler + 'static> InboundProtocol for T {
116 fn name(&self) -> &'static str {
117 crate::traits::ProtocolHandler::name(self)
118 }
119
120 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
121 crate::traits::ProtocolHandler::run(self, Arc::clone(ctx.registry()), shutdown).await
122 }
123}
124
125/// The ergonomic, cloneable handle a protocol worker uses to reach the engine
126/// bus, handed to every [`InboundProtocol::serve`] call.
127///
128/// It hides the [`PublishRegistry`] trait object behind a small, stable surface:
129/// claim a stream for publishing ([`open_publish`](Self::open_publish)), or reach
130/// the underlying registry for advanced flows ([`registry`](Self::registry)).
131/// Cloning is cheap (an `Arc` bump) — share it freely across per-connection tasks.
132#[derive(Clone)]
133pub struct IngestContext {
134 registry: Arc<dyn PublishRegistry>,
135}
136
137impl IngestContext {
138 /// Wrap a publish registry. The engine constructs this for you; tests and
139 /// embedders can build one directly from any [`PublishRegistry`] (e.g. an
140 /// `Arc<Engine>`).
141 pub fn new(registry: Arc<dyn PublishRegistry>) -> Self {
142 Self { registry }
143 }
144
145 /// Claim `key` for publishing, returning an RAII [`PublishSession`].
146 ///
147 /// Frames published through the returned session flow into the lock-free
148 /// broadcast fan-out, the keyframe-anchored GOP cache (for instant-start
149 /// replay), and the live QoS counters — exactly as a native handler's frames
150 /// do. Fails with [`StreamAlreadyPublishing`] on a live duplicate or
151 /// [`PublisherLimitReached`] at capacity.
152 ///
153 /// [`StreamAlreadyPublishing`]: crate::StreamError::StreamAlreadyPublishing
154 /// [`PublisherLimitReached`]: crate::StreamError::PublisherLimitReached
155 pub async fn open_publish(&self, key: StreamKey) -> Result<PublishSession> {
156 let handle = self.registry.start_publish(&key).await?;
157 Ok(PublishSession {
158 handle,
159 registry: Arc::clone(&self.registry),
160 key,
161 released: false,
162 })
163 }
164
165 /// The underlying publish registry, for flows not covered by
166 /// [`open_publish`](Self::open_publish).
167 pub fn registry(&self) -> &Arc<dyn PublishRegistry> {
168 &self.registry
169 }
170}
171
172/// An RAII publish session — a protocol worker's exclusive token for one live
173/// stream.
174///
175/// Wraps the engine's [`StreamHandle`]: every [`publish_frame`](Self::publish_frame)
176/// lands in the broadcast fan-out, GOP cache, and live QoS counters. Dropping the
177/// session releases the publish slot (best-effort, on the current Tokio runtime),
178/// so a worker that returns early or panics never leaks a stream. Prefer
179/// [`finish`](Self::finish) for deterministic, awaited teardown.
180pub struct PublishSession {
181 handle: StreamHandle,
182 registry: Arc<dyn PublishRegistry>,
183 key: StreamKey,
184 released: bool,
185}
186
187impl PublishSession {
188 /// The stream key this session publishes to.
189 pub fn key(&self) -> &StreamKey {
190 &self.key
191 }
192
193 /// The underlying engine [`StreamHandle`] — GOP cache, QoS, subscriber count,
194 /// metadata, and direct `subscribe`/replay access.
195 pub fn handle(&self) -> &StreamHandle {
196 &self.handle
197 }
198
199 /// Publish one decoded frame to all subscribers; returns the live subscriber
200 /// count (`0` when nobody is watching yet).
201 pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize> {
202 self.handle.publish_frame(frame)
203 }
204
205 /// Release the publish slot deterministically. Prefer this over relying on
206 /// [`Drop`] whenever you can `await` — it surfaces the teardown error and
207 /// completes before the next stream can reuse the key.
208 pub async fn finish(mut self) -> Result<()> {
209 self.released = true;
210 self.registry.end_publish(&self.key).await
211 }
212}
213
214impl Drop for PublishSession {
215 fn drop(&mut self) {
216 if self.released {
217 return;
218 }
219 // Best-effort async release. `end_publish` is async; if a Tokio runtime is
220 // available (the normal case inside a worker), spawn the teardown so the
221 // slot is freed even on an early return or panic.
222 if let Ok(rt) = tokio::runtime::Handle::try_current() {
223 let registry = Arc::clone(&self.registry);
224 let key = self.key.clone();
225 rt.spawn(async move {
226 let _ = registry.end_publish(&key).await;
227 });
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::bus::PlaybackRegistry;
236 use crate::{AppSpec, CodecId, Engine, FrameFlags};
237 use bytes::Bytes;
238
239 /// A tiny custom protocol that publishes a config + keyframe then idles until
240 /// shutdown — exercising the full `InboundProtocol` → `IngestContext` →
241 /// `PublishSession` path against a real engine.
242 struct DemoProtocol {
243 key: StreamKey,
244 }
245
246 #[async_trait]
247 impl InboundProtocol for DemoProtocol {
248 fn name(&self) -> &'static str {
249 "demo"
250 }
251
252 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
253 let session = ctx.open_publish(self.key.clone()).await?;
254 let mut cfg =
255 MediaFrame::new_video(0, 0, Bytes::from_static(b"sps"), CodecId::H264, false);
256 cfg.flags |= FrameFlags::CONFIG;
257 session.publish_frame(cfg)?;
258 session.publish_frame(MediaFrame::new_video(
259 0,
260 0,
261 Bytes::from_static(b"idr"),
262 CodecId::H264,
263 true,
264 ))?;
265 shutdown.cancelled().await;
266 session.finish().await
267 }
268 }
269
270 #[tokio::test]
271 async fn custom_protocol_publishes_through_ingest_context() {
272 let engine = Engine::builder()
273 .application(AppSpec::new("live").gop_cache(8))
274 .build();
275 let key = StreamKey::new("live", "cam");
276 let ctx = IngestContext::new(engine.clone());
277
278 let proto = DemoProtocol { key: key.clone() };
279 let shutdown = CancellationToken::new();
280
281 // Run the worker; cancel once it has had a chance to publish.
282 let worker = {
283 let shutdown = shutdown.clone();
284 tokio::spawn(async move { proto.serve(ctx, shutdown).await })
285 };
286
287 // Wait until the stream is live and its GOP cache holds the keyframe.
288 let handle = loop {
289 if let Ok(h) = engine.get_stream(&key) {
290 if h.replay_buffer().iter().any(|f| f.is_keyframe()) {
291 break h;
292 }
293 }
294 tokio::task::yield_now().await;
295 };
296 let (vcfg, _) = handle.cached_configs();
297 assert!(vcfg.is_some(), "config frame cached via PublishSession");
298
299 shutdown.cancel();
300 worker.await.unwrap().unwrap();
301 // After finish(), the publish slot is released.
302 assert!(
303 engine.get_stream(&key).is_err(),
304 "session released on finish"
305 );
306 }
307
308 #[tokio::test]
309 async fn dropping_session_releases_the_slot() {
310 let engine = Engine::builder().application(AppSpec::new("live")).build();
311 let key = StreamKey::new("live", "drop-test");
312 let ctx = IngestContext::new(engine.clone());
313
314 {
315 let _session = ctx.open_publish(key.clone()).await.unwrap();
316 assert!(engine.get_stream(&key).is_ok(), "stream live while held");
317 } // dropped here → best-effort async end_publish spawned
318
319 // Yield so the spawned teardown runs.
320 for _ in 0..16 {
321 if engine.get_stream(&key).is_err() {
322 break;
323 }
324 tokio::task::yield_now().await;
325 }
326 assert!(engine.get_stream(&key).is_err(), "slot released on drop");
327 }
328}