arcly_stream/bus/handle.rs
1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, Result, StreamId, StreamKey};
5use arc_swap::{ArcSwap, ArcSwapOption};
6use std::net::SocketAddr;
7use std::ops::ControlFlow;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex as StdMutex, OnceLock};
10use std::time::{Instant, SystemTime, UNIX_EPOCH};
11use tokio::sync::broadcast::error::RecvError;
12use tokio::sync::{broadcast, RwLock};
13use tokio_util::sync::CancellationToken;
14
15/// A per-frame egress sink driven by [`StreamHandle::drive_to`].
16///
17/// Implementors send one frame to a single downstream peer (an RTSP player, an
18/// upstream RTMP server, …). Returning [`ControlFlow::Break`] stops the drive
19/// cleanly — typically because the peer disconnected — while `Err` aborts it.
20#[async_trait::async_trait]
21pub trait FrameSink: Send {
22 /// Forward one frame; `Break` ends the session gracefully.
23 async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<ControlFlow<()>>;
24}
25
26/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
27///
28/// Use this only for *displayed* timestamps (e.g. `started_at_ms`); it is
29/// subject to NTP steps and operator clock changes, so it must never drive
30/// elapsed-time decisions. For those, use [`mono_ms`].
31pub(crate) fn now_ms() -> u64 {
32 SystemTime::now()
33 .duration_since(UNIX_EPOCH)
34 .map(|d| d.as_millis() as u64)
35 .unwrap_or(0)
36}
37
38/// Milliseconds elapsed on a process-local **monotonic** clock since the first
39/// call. Unlike [`now_ms`], this never jumps backward (or forward) when the wall
40/// clock is adjusted, so it is the correct basis for QoS windows and the idle
41/// reaper: a leap-second or NTP correction can't spuriously reap a live stream
42/// or distort a measured bitrate.
43pub(crate) fn mono_ms() -> u64 {
44 static EPOCH: OnceLock<Instant> = OnceLock::new();
45 EPOCH.get_or_init(Instant::now).elapsed().as_millis() as u64
46}
47
48/// Current lifecycle state of a stream.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StreamState {
51 /// No publisher yet.
52 Idle,
53 /// A publisher is connected and sending data.
54 Publishing,
55 /// The stream is being transcoded into one or more renditions.
56 Transcoding,
57 /// The stream is being recorded.
58 Recording,
59 /// The publisher has disconnected; the stream has ended.
60 Ended,
61}
62
63impl std::fmt::Display for StreamState {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.write_str(match self {
66 StreamState::Idle => "idle",
67 StreamState::Publishing => "publishing",
68 StreamState::Transcoding => "transcoding",
69 StreamState::Recording => "recording",
70 StreamState::Ended => "ended",
71 })
72 }
73}
74
75/// Runtime metadata about a stream, updated continuously while publishing.
76///
77/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
78/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
79/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
80/// by [`StreamHandle::metadata_snapshot`].
81#[derive(Debug, Clone)]
82pub struct StreamMetadata {
83 /// The `(app, stream_id)` this metadata describes.
84 pub key: StreamKey,
85 /// Publisher remote address.
86 pub publisher_addr: Option<SocketAddr>,
87 /// Video width in pixels (0 = unknown).
88 pub width: u32,
89 /// Video height in pixels (0 = unknown).
90 pub height: u32,
91 /// Video frames per second (0 = unknown). Overlaid from measured throughput.
92 pub fps: f64,
93 /// Measured video ingest bitrate in bits-per-second.
94 pub video_bitrate_bps: u64,
95 /// Measured audio ingest bitrate in bits-per-second.
96 pub audio_bitrate_bps: u64,
97 /// Timestamp of the first frame received (Unix ms).
98 pub started_at_ms: u64,
99 /// Protocol used for ingest (e.g. `"rtmp"`).
100 pub ingest_protocol: String,
101}
102
103impl StreamMetadata {
104 /// Create zeroed metadata for `(app, stream_id)`.
105 pub fn new(app: AppName, stream_id: StreamId) -> Self {
106 Self {
107 key: StreamKey::new(app, stream_id),
108 publisher_addr: None,
109 width: 0,
110 height: 0,
111 fps: 0.0,
112 video_bitrate_bps: 0,
113 audio_bitrate_bps: 0,
114 started_at_ms: 0,
115 ingest_protocol: String::new(),
116 }
117 }
118}
119
120/// A point-in-time snapshot of a stream's measured quality of service.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct Qos {
123 /// Video bitrate over the last ~1s window (bits/sec).
124 pub video_bitrate_bps: u64,
125 /// Audio bitrate over the last ~1s window (bits/sec).
126 pub audio_bitrate_bps: u64,
127 /// Video frames per second over the last ~1s window.
128 pub fps: f64,
129 /// Cumulative frames published on this stream.
130 pub total_frames: u64,
131 /// Cumulative payload bytes published on this stream.
132 pub total_bytes: u64,
133}
134
135/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
136///
137/// A stream has a single publisher (enforced by `start_publish`), so the
138/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
139#[derive(Default)]
140struct QosCounters {
141 total_frames: AtomicU64,
142 total_bytes: AtomicU64,
143 window_start_ms: AtomicU64,
144 window_video_bytes: AtomicU64,
145 window_audio_bytes: AtomicU64,
146 window_video_frames: AtomicU64,
147 cur_video_bitrate: AtomicU64,
148 cur_audio_bitrate: AtomicU64,
149 cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
150 last_frame_ms: AtomicU64,
151}
152
153/// Keyframe-anchored replay buffer for instant playback start.
154struct GopBuffer {
155 /// Frames since (and including) the most recent keyframe.
156 frames: Vec<Arc<MediaFrame>>,
157 /// Hard cap on buffered frames (memory bound between keyframes).
158 capacity: usize,
159 /// Whether the current GOP already overflowed `capacity` (so the truncation
160 /// signal fires once per GOP, not once per dropped frame).
161 overflowed: bool,
162}
163
164/// A live handle to a single active stream.
165///
166/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
167/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
168/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
169///
170/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
171/// 4096 slots ≈ 32 KB per stream.
172///
173/// # Backpressure model
174///
175/// Fan-out uses a **single, fixed-capacity ring buffer per stream** (a
176/// `tokio::broadcast` channel sized by `AppSpec::broadcast_capacity`). This is a
177/// deliberate design choice, with consequences worth understanding:
178///
179/// - **The publisher never blocks on a slow subscriber.** Publishing is a
180/// non-awaiting pointer write; one subscriber falling behind can never apply
181/// backpressure to the publisher or to its peers. This is what keeps the hot
182/// path lock-free and the fast publisher isolated from the slow viewer.
183/// - **Backpressure is resolved by dropping, not stalling.** A subscriber that
184/// can't keep up overruns the ring and observes lag.
185/// [`Subscription::recv`] resynchronizes to the oldest still-buffered frame
186/// and reports the gap via [`Observer::on_subscriber_lagged`]; with
187/// [`Subscription::max_lag`] a chronically slow consumer is evicted
188/// ([`Observer::on_subscriber_evicted`]) rather than churning forever.
189/// - **Capacity is the tuning knob**, traded per stream: larger capacity
190/// tolerates burstier consumers at higher per-stream memory, smaller capacity
191/// sheds laggards sooner. There is intentionally no per-subscriber queue —
192/// that would reintroduce unbounded memory growth and per-consumer locking,
193/// the very things this design avoids.
194///
195/// In short: a slow subscriber degrades only *its own* view (lag, then
196/// eviction), never the publisher's or another subscriber's. Wire an
197/// [`Observer`] to see lag and eviction as they happen.
198#[derive(Clone)]
199pub struct StreamHandle {
200 metadata: Arc<RwLock<StreamMetadata>>,
201 state: Arc<RwLock<StreamState>>,
202 key: StreamKey,
203 /// The frame-bus sender, held *indirectly* so its lifetime is owned by the
204 /// stream's lifecycle — not by how many `StreamHandle` clones happen to be
205 /// alive. Cloning a handle shares this cell; it does not mint a new sender.
206 ///
207 /// This is the structural fix for a sharp edge: previously the handle stored
208 /// the `broadcast::Sender` by value, so any consumer that merely retained a
209 /// handle (to subscribe, read metadata, request a keyframe) silently pinned
210 /// the channel open and defeated the `Closed` shutdown signal. Now
211 /// [`close`](Self::close) — called by the registry when a publish ends — empties
212 /// this cell, dropping the sole sender and closing the channel regardless of
213 /// any lingering handle clones.
214 tx: Arc<ArcSwapOption<broadcast::Sender<Arc<MediaFrame>>>>,
215 /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
216 /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
217 video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
218 /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
219 audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
220 /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
221 gop: Arc<StdMutex<GopBuffer>>,
222 gop_capacity: usize,
223 /// Live throughput counters.
224 qos: Arc<QosCounters>,
225 /// Injected telemetry hook (no-op by default).
226 observer: Arc<dyn Observer>,
227}
228
229impl StreamHandle {
230 /// Create a handle with the no-op observer and no GOP cache.
231 pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
232 Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
233 }
234
235 /// Create a handle wired to a host-supplied observer.
236 ///
237 /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
238 pub fn with_observer(
239 app: AppName,
240 stream_id: StreamId,
241 capacity: usize,
242 gop_capacity: usize,
243 observer: Arc<dyn Observer>,
244 ) -> Self {
245 let (tx, _) = broadcast::channel(capacity);
246 let qos = QosCounters::default();
247 // Treat creation as the last activity so a just-claimed stream is not
248 // instantly considered idle before its first frame arrives. Monotonic, to
249 // match the idle reaper (see `mono_ms`).
250 qos.last_frame_ms.store(mono_ms(), Ordering::Relaxed);
251 Self {
252 metadata: Arc::new(RwLock::new(StreamMetadata::new(
253 app.clone(),
254 stream_id.clone(),
255 ))),
256 state: Arc::new(RwLock::new(StreamState::Idle)),
257 key: StreamKey::new(app, stream_id),
258 tx: Arc::new(ArcSwapOption::new(Some(Arc::new(tx)))),
259 video_config: Arc::new(ArcSwap::new(Arc::new(None))),
260 audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
261 gop: Arc::new(StdMutex::new(GopBuffer {
262 frames: Vec::new(),
263 capacity: gop_capacity,
264 overflowed: false,
265 })),
266 gop_capacity,
267 qos: Arc::new(qos),
268 observer,
269 }
270 }
271
272 /// The `(app, stream_id)` this handle belongs to.
273 pub fn key(&self) -> &StreamKey {
274 &self.key
275 }
276
277 /// Publish a frame to all current subscribers. Returns the number of
278 /// active receivers; returns `Ok(0)` when there are no subscribers.
279 pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
280 self.observer.on_frame(&self.key, &frame);
281 let len = frame.data.len() as u64;
282 let is_audio = frame.is_audio();
283 let is_key = frame.is_keyframe();
284 let is_config = frame.flags.contains(FrameFlags::CONFIG);
285 let arc = Arc::new(frame);
286
287 // Cache the latest CONFIG frame for late-joining subscribers.
288 if is_config {
289 if is_audio {
290 self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
291 } else {
292 self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
293 }
294 }
295
296 // Maintain the keyframe-anchored GOP replay buffer.
297 let mut gop_truncated = false;
298 if self.gop_capacity > 0 {
299 if let Ok(mut g) = self.gop.lock() {
300 if is_key {
301 g.frames.clear();
302 g.overflowed = false;
303 g.frames.push(Arc::clone(&arc));
304 } else if !is_config && !g.frames.is_empty() {
305 // Only buffer once a keyframe anchors the GOP; CONFIG frames
306 // are replayed separately via `cached_configs`.
307 if g.frames.len() < g.capacity {
308 g.frames.push(Arc::clone(&arc));
309 } else if !g.overflowed {
310 // First overflow of this GOP: the replay will be truncated.
311 // Latch it so the signal fires once, not per dropped frame.
312 g.overflowed = true;
313 gop_truncated = true;
314 }
315 }
316 }
317 }
318 if gop_truncated {
319 self.observer.on_gop_truncated(&self.key, self.gop_capacity);
320 }
321
322 self.record_qos(len, is_audio, is_key);
323
324 // The sender is gone once the stream is closed; publishing then is a
325 // no-op (Ok(0)) rather than an error, so a publisher racing teardown
326 // winds down cleanly.
327 let count = match self.tx.load_full() {
328 Some(tx) => tx.send(arc).unwrap_or(0),
329 None => 0,
330 };
331 Ok(count)
332 }
333
334 /// Fold one frame into the rolling throughput window.
335 fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
336 let q = &self.qos;
337 let now = mono_ms();
338 q.total_frames.fetch_add(1, Ordering::Relaxed);
339 q.total_bytes.fetch_add(len, Ordering::Relaxed);
340 q.last_frame_ms.store(now, Ordering::Relaxed);
341 if is_audio {
342 q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
343 } else {
344 q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
345 q.window_video_frames.fetch_add(1, Ordering::Relaxed);
346 }
347
348 let ws = q.window_start_ms.load(Ordering::Relaxed);
349 if ws == 0 {
350 q.window_start_ms.store(now, Ordering::Relaxed);
351 } else if now.saturating_sub(ws) >= 1000 {
352 let elapsed = (now - ws) as f64 / 1000.0;
353 let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
354 let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
355 let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
356 q.cur_video_bitrate
357 .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
358 q.cur_audio_bitrate
359 .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
360 q.cur_fps_milli.store(
361 (vframes as f64 / elapsed * 1000.0) as u64,
362 Ordering::Relaxed,
363 );
364 q.window_start_ms.store(now, Ordering::Relaxed);
365 }
366 }
367
368 /// A snapshot of measured throughput (bitrate, fps, totals).
369 pub fn qos(&self) -> Qos {
370 let q = &self.qos;
371 Qos {
372 video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
373 audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
374 fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
375 total_frames: q.total_frames.load(Ordering::Relaxed),
376 total_bytes: q.total_bytes.load(Ordering::Relaxed),
377 }
378 }
379
380 /// Monotonic-clock timestamp (process-local milliseconds) of the most
381 /// recently published frame (or stream creation if none yet). Used by the
382 /// engine's idle reaper; this is elapsed monotonic time, not wall-clock time,
383 /// so compare it only against other readings of the same monotonic clock.
384 pub fn last_frame_ms(&self) -> u64 {
385 self.qos.last_frame_ms.load(Ordering::Relaxed)
386 }
387
388 /// Returns the most recently seen video and audio CONFIG frames,
389 /// for replaying to late-joining subscribers.
390 pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
391 let video = (**self.video_config.load()).clone();
392 let audio = (**self.audio_config.load()).clone();
393 (video, audio)
394 }
395
396 /// The frames a late joiner should be handed before going live: cached
397 /// decoder configs followed by the current GOP (keyframe + trailing deltas).
398 ///
399 /// Replaying these lets a new subscriber start decoding immediately rather
400 /// than waiting for the next keyframe — sub-second join times at scale.
401 /// Requires the app to have enabled a GOP cache; otherwise only the cached
402 /// configs are returned.
403 pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
404 let (vcfg, acfg) = self.cached_configs();
405 let mut out = Vec::new();
406 out.extend(vcfg.clone());
407 out.extend(acfg.clone());
408 if self.gop_capacity > 0 {
409 if let Ok(g) = self.gop.lock() {
410 out.reserve(g.frames.len());
411 for f in &g.frames {
412 // Avoid duplicating a config frame already pushed above. Only
413 // the (≤2) cached configs can collide with a GOP frame — GOP
414 // frames are themselves distinct — so compare against just
415 // those, keeping this O(n) rather than O(n²) over the GOP.
416 let dup = vcfg.as_ref().is_some_and(|c| Arc::ptr_eq(c, f))
417 || acfg.as_ref().is_some_and(|c| Arc::ptr_eq(c, f));
418 if !dup {
419 out.push(Arc::clone(f));
420 }
421 }
422 }
423 }
424 out
425 }
426
427 /// Subscribe to this stream's frame bus.
428 ///
429 /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
430 /// when a slow consumer falls behind the channel capacity — callers that
431 /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
432 /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
433 /// are deliberately handling lag yourself.
434 pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
435 match self.tx.load_full() {
436 Some(tx) => tx.subscribe(),
437 // The stream has already closed: hand back a receiver on a spent
438 // channel so the caller's `recv` loop terminates immediately with
439 // `Closed` rather than blocking forever.
440 None => {
441 let (_, rx) = broadcast::channel(1);
442 rx
443 }
444 }
445 }
446
447 /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
448 /// of terminating, reporting each gap to the installed [`Observer`] via
449 /// [`Observer::on_subscriber_lagged`].
450 pub fn subscribe_resilient(&self) -> Subscription {
451 Subscription {
452 rx: self.subscribe(),
453 key: self.key.clone(),
454 observer: Arc::clone(&self.observer),
455 max_lag: None,
456 skipped: 0,
457 }
458 }
459
460 /// Drive this stream to a [`FrameSink`]: replay the instant-start buffer
461 /// (cached configs + cached GOP) so the consumer can decode immediately,
462 /// then forward live frames until the stream ends or `shutdown` fires.
463 ///
464 /// This is the playback loop every egress that pulls from a single stream
465 /// shares (RTSP server, RTMP relay, …); each supplies only its per-frame
466 /// [`FrameSink::send`].
467 pub async fn drive_to(
468 &self,
469 shutdown: &CancellationToken,
470 sink: &mut dyn FrameSink,
471 ) -> Result<()> {
472 for frame in self.replay_buffer() {
473 if sink.send(frame).await?.is_break() {
474 return Ok(());
475 }
476 }
477 let mut sub = self.subscribe_resilient();
478 loop {
479 tokio::select! {
480 _ = shutdown.cancelled() => break,
481 next = sub.recv() => match next {
482 Some(frame) => {
483 if sink.send(frame).await?.is_break() {
484 break;
485 }
486 }
487 None => break, // stream ended
488 }
489 }
490 }
491 Ok(())
492 }
493
494 /// Drive this stream into a [`Packager`](crate::packager::Packager) (HLS
495 /// segmenter, DASH packager, …): replay the instant-start buffer — **which
496 /// carries the cached CONFIG access unit (SPS/PPS)** — then forward live
497 /// frames until the stream ends or `shutdown` fires, and `finish`.
498 ///
499 /// Priming with [`replay_buffer`](Self::replay_buffer) is essential, not just
500 /// an optimization: the CONFIG frame is published once at stream start, so a
501 /// packager that only `subscribe_resilient`s (subscribing *after* the event
502 /// that spawned it) never sees it — and then a fragmented-MP4 muxer never
503 /// builds its `init.m4s` and the master playlist never learns its codec
504 /// string. Replaying configs first makes both deterministic.
505 #[cfg(feature = "hls")]
506 pub async fn package_to(
507 &self,
508 shutdown: &CancellationToken,
509 packager: &mut dyn crate::packager::Packager,
510 ) -> Result<()> {
511 for frame in self.replay_buffer() {
512 packager.push(&frame).await?;
513 }
514 let mut sub = self.subscribe_resilient();
515 loop {
516 tokio::select! {
517 _ = shutdown.cancelled() => break,
518 next = sub.recv() => match next {
519 Some(frame) => packager.push(&frame).await?,
520 None => break, // stream ended
521 }
522 }
523 }
524 packager.finish().await
525 }
526
527 /// Number of active subscribers (0 once the stream is closed).
528 pub fn subscriber_count(&self) -> usize {
529 self.tx
530 .load_full()
531 .map(|tx| tx.receiver_count())
532 .unwrap_or(0)
533 }
534
535 /// Close the frame bus: drop the sole sender so every subscriber's `recv`
536 /// observes `Closed` and terminates, *regardless* of how many `StreamHandle`
537 /// clones are still alive.
538 ///
539 /// Called by the registry when a publish ends (see
540 /// `Application::end_publish`). Idempotent. This is what makes the channel's
541 /// lifetime track the stream's lifecycle rather than handle reachability.
542 pub fn close(&self) {
543 self.tx.store(None);
544 }
545
546 /// Transition to a new state.
547 pub async fn set_state(&self, state: StreamState) {
548 let mut guard = self.state.write().await;
549 *guard = state;
550 }
551
552 /// The current lifecycle state.
553 pub async fn current_state(&self) -> StreamState {
554 self.state.read().await.clone()
555 }
556
557 /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
558 /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
559 ///
560 /// Cloning the snapshot releases the lock immediately, so callers never hold
561 /// the metadata `RwLock` across an `.await`.
562 pub async fn metadata_snapshot(&self) -> StreamMetadata {
563 let mut m = self.metadata.read().await.clone();
564 let q = self.qos();
565 m.video_bitrate_bps = q.video_bitrate_bps;
566 m.audio_bitrate_bps = q.audio_bitrate_bps;
567 if q.fps > 0.0 {
568 m.fps = q.fps;
569 }
570 m
571 }
572
573 /// Mutate this stream's [`StreamMetadata`] under the write lock.
574 ///
575 /// Ingest handlers call this as they parse the stream — e.g. on the first
576 /// keyframe to record resolution from the codec config, or to set the
577 /// publisher address — so the metadata exposed to operators and the control
578 /// plane stays live rather than frozen at its zeroed defaults.
579 ///
580 /// ```no_run
581 /// # use arcly_stream::StreamHandle;
582 /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
583 /// handle
584 /// .update_metadata(|m| {
585 /// m.publisher_addr = Some(addr);
586 /// m.width = 1920;
587 /// m.height = 1080;
588 /// m.ingest_protocol = "rtmp".to_string();
589 /// })
590 /// .await;
591 /// # }
592 /// ```
593 pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
594 let mut guard = self.metadata.write().await;
595 f(&mut guard);
596 }
597}
598
599impl std::fmt::Debug for StreamHandle {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 f.debug_struct("StreamHandle")
602 .field("key", &self.key)
603 .field("subscribers", &self.subscriber_count())
604 .finish()
605 }
606}
607
608/// A lag-tolerant subscription to a stream's frame bus.
609///
610/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
611/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
612/// consumer falls behind: the dropped span is reported to the [`Observer`] as
613/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
614/// continues from the oldest still-buffered frame. This is the recommended
615/// consumer loop for packagers, recorders, and SFUs.
616///
617/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
618/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
619/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
620/// notification) so a hopelessly slow consumer is shed rather than wasting
621/// buffer churn forever.
622pub struct Subscription {
623 rx: broadcast::Receiver<Arc<MediaFrame>>,
624 key: StreamKey,
625 observer: Arc<dyn Observer>,
626 max_lag: Option<u64>,
627 skipped: u64,
628}
629
630impl Subscription {
631 /// Evict this subscriber once cumulative dropped frames exceed `max`.
632 ///
633 /// ```no_run
634 /// # use arcly_stream::StreamHandle;
635 /// # fn demo(handle: &StreamHandle) {
636 /// let sub = handle.subscribe_resilient().max_lag(10_000);
637 /// # let _ = sub;
638 /// # }
639 /// ```
640 pub fn max_lag(mut self, max: u64) -> Self {
641 self.max_lag = Some(max);
642 self
643 }
644
645 /// Total frames dropped from this subscriber's view so far.
646 pub fn dropped(&self) -> u64 {
647 self.skipped
648 }
649
650 /// Receive the next frame, resynchronizing past any lag.
651 ///
652 /// Returns `None` when the stream's sender is dropped (the publisher ended)
653 /// or when the `max_lag` eviction threshold is crossed:
654 ///
655 /// ```no_run
656 /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
657 /// while let Some(frame) = sub.recv().await {
658 /// // packetize `frame` …
659 /// }
660 /// # }
661 /// ```
662 pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
663 loop {
664 match self.rx.recv().await {
665 Ok(frame) => return Some(frame),
666 Err(RecvError::Lagged(skipped)) => {
667 self.skipped = self.skipped.saturating_add(skipped);
668 self.observer.on_subscriber_lagged(&self.key, skipped);
669 if let Some(max) = self.max_lag {
670 if self.skipped > max {
671 self.observer.on_subscriber_evicted(&self.key);
672 return None;
673 }
674 }
675 continue;
676 }
677 Err(RecvError::Closed) => return None,
678 }
679 }
680 }
681
682 /// Borrow the underlying raw receiver, for callers that need
683 /// [`broadcast::Receiver`] APIs directly.
684 pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
685 &mut self.rx
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use crate::CodecId;
693
694 fn video(pts: i64, key: bool) -> MediaFrame {
695 MediaFrame::new_video(
696 pts,
697 pts,
698 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65]),
699 CodecId::H264,
700 key,
701 )
702 }
703
704 /// Regression guard for the sender-lifetime fix: `close()` must terminate a
705 /// subscriber's `recv` even while a `StreamHandle` clone is still held — the
706 /// exact shape that previously hung WHEP egress forever.
707 #[tokio::test]
708 async fn close_terminates_recv_while_a_handle_clone_is_held() {
709 let handle = StreamHandle::new("live".into(), "show".into(), 16);
710 let mut sub = handle.subscribe_resilient();
711
712 // A retained clone (e.g. an egress pump) must NOT pin the channel open.
713 let retained = handle.clone();
714 handle.publish_frame(video(0, true)).unwrap();
715 assert!(sub.recv().await.is_some(), "frame delivered before close");
716
717 retained.close();
718
719 // Without the registry-owned sender this would block forever; bound it so
720 // a regression fails loudly instead of hanging the suite.
721 let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
722 .await
723 .expect("recv resolved after close (no hang)");
724 assert!(got.is_none(), "recv returns None once the stream is closed");
725
726 // Publishing post-close is a clean no-op, not a panic.
727 assert_eq!(retained.publish_frame(video(1, false)).unwrap(), 0);
728 assert_eq!(retained.subscriber_count(), 0);
729 }
730
731 /// `package_to` must replay the cached CONFIG access unit even though it was
732 /// published *before* the packager subscribed — the regression behind DASH's
733 /// missing `init.m4s` and the absent master playlist.
734 #[cfg(feature = "hls")]
735 #[tokio::test]
736 async fn package_to_replays_config_published_before_subscribe() {
737 use crate::packager::Packager;
738 use tokio_util::sync::CancellationToken;
739
740 // A trivial packager that records the flags of every frame it is pushed.
741 struct RecordingPackager(std::sync::Arc<std::sync::Mutex<Vec<FrameFlags>>>);
742 #[async_trait::async_trait]
743 impl Packager for RecordingPackager {
744 async fn push(&mut self, frame: &MediaFrame) -> crate::Result<()> {
745 self.0.lock().unwrap().push(frame.flags);
746 Ok(())
747 }
748 async fn finish(&mut self) -> crate::Result<()> {
749 Ok(())
750 }
751 }
752
753 let handle = StreamHandle::new("live".into(), "cam".into(), 16);
754
755 // The one-shot CONFIG frame arrives BEFORE any packager subscribes.
756 let mut cfg = video(0, true);
757 cfg.flags |= FrameFlags::CONFIG;
758 handle.publish_frame(cfg).unwrap();
759
760 let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
761 let mut pkg = RecordingPackager(seen.clone());
762 let h = handle.clone();
763 let shutdown = CancellationToken::new();
764 let task = tokio::spawn(async move { h.package_to(&shutdown, &mut pkg).await });
765
766 // Give the replay + subscribe a moment, then end the stream.
767 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
768 handle.publish_frame(video(1, false)).unwrap();
769 handle.close();
770 task.await.unwrap().unwrap();
771
772 assert!(
773 seen.lock()
774 .unwrap()
775 .iter()
776 .any(|f| f.contains(FrameFlags::CONFIG)),
777 "package_to must replay the cached CONFIG frame to the packager"
778 );
779 }
780
781 /// The GOP-overflow signal fires once per GOP (not per dropped frame) and
782 /// resets on the next keyframe so a fresh GOP can report again.
783 #[test]
784 fn gop_truncation_signals_once_per_gop() {
785 use crate::StreamKey;
786 use std::sync::atomic::{AtomicUsize, Ordering};
787
788 #[derive(Default)]
789 struct CountingObserver {
790 truncations: AtomicUsize,
791 }
792 impl Observer for CountingObserver {
793 fn on_gop_truncated(&self, _key: &StreamKey, _capacity: usize) {
794 self.truncations.fetch_add(1, Ordering::Relaxed);
795 }
796 }
797
798 let obs = Arc::new(CountingObserver::default());
799 // Capacity 2: keyframe + 1 delta fit; the 2nd delta onward overflows.
800 let handle = StreamHandle::with_observer(
801 "live".into(),
802 "g".into(),
803 64,
804 2,
805 Arc::clone(&obs) as Arc<dyn Observer>,
806 );
807
808 handle.publish_frame(video(0, true)).unwrap(); // keyframe anchors GOP
809 handle.publish_frame(video(1, false)).unwrap(); // fills to capacity
810 handle.publish_frame(video(2, false)).unwrap(); // first overflow → signal
811 handle.publish_frame(video(3, false)).unwrap(); // still overflowing → silent
812 assert_eq!(
813 obs.truncations.load(Ordering::Relaxed),
814 1,
815 "one signal per GOP"
816 );
817
818 // A new keyframe resets the latch; overflowing again signals once more.
819 handle.publish_frame(video(4, true)).unwrap();
820 handle.publish_frame(video(5, false)).unwrap();
821 handle.publish_frame(video(6, false)).unwrap();
822 handle.publish_frame(video(7, false)).unwrap();
823 assert_eq!(
824 obs.truncations.load(Ordering::Relaxed),
825 2,
826 "next GOP signals again"
827 );
828 }
829
830 /// `mono_ms` is monotonic and drives `last_frame_ms`, so the idle reaper's
831 /// elapsed-time math is independent of the wall clock.
832 #[test]
833 fn mono_clock_is_monotonic_and_drives_last_frame() {
834 let a = mono_ms();
835 let b = mono_ms();
836 assert!(b >= a, "monotonic clock never goes backward");
837
838 let handle = StreamHandle::new("live".into(), "m".into(), 4);
839 let before = mono_ms();
840 handle.publish_frame(video(0, true)).unwrap();
841 assert!(
842 handle.last_frame_ms() >= before,
843 "last_frame_ms advances on the monotonic clock"
844 );
845 }
846}