arcly_stream/bus/handle.rs
1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, StreamId, StreamKey};
5use arc_swap::{ArcSwap, ArcSwapOption};
6use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex, OnceLock};
9use std::time::{Instant, SystemTime, UNIX_EPOCH};
10use tokio::sync::broadcast::error::RecvError;
11use tokio::sync::{broadcast, RwLock};
12
13/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
14///
15/// Use this only for *displayed* timestamps (e.g. `started_at_ms`); it is
16/// subject to NTP steps and operator clock changes, so it must never drive
17/// elapsed-time decisions. For those, use [`mono_ms`].
18pub(crate) fn now_ms() -> u64 {
19 SystemTime::now()
20 .duration_since(UNIX_EPOCH)
21 .map(|d| d.as_millis() as u64)
22 .unwrap_or(0)
23}
24
25/// Milliseconds elapsed on a process-local **monotonic** clock since the first
26/// call. Unlike [`now_ms`], this never jumps backward (or forward) when the wall
27/// clock is adjusted, so it is the correct basis for QoS windows and the idle
28/// reaper: a leap-second or NTP correction can't spuriously reap a live stream
29/// or distort a measured bitrate.
30pub(crate) fn mono_ms() -> u64 {
31 static EPOCH: OnceLock<Instant> = OnceLock::new();
32 EPOCH.get_or_init(Instant::now).elapsed().as_millis() as u64
33}
34
35/// Current lifecycle state of a stream.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum StreamState {
38 /// No publisher yet.
39 Idle,
40 /// A publisher is connected and sending data.
41 Publishing,
42 /// The stream is being transcoded into one or more renditions.
43 Transcoding,
44 /// The stream is being recorded.
45 Recording,
46 /// The publisher has disconnected; the stream has ended.
47 Ended,
48}
49
50impl std::fmt::Display for StreamState {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.write_str(match self {
53 StreamState::Idle => "idle",
54 StreamState::Publishing => "publishing",
55 StreamState::Transcoding => "transcoding",
56 StreamState::Recording => "recording",
57 StreamState::Ended => "ended",
58 })
59 }
60}
61
62/// Runtime metadata about a stream, updated continuously while publishing.
63///
64/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
65/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
66/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
67/// by [`StreamHandle::metadata_snapshot`].
68#[derive(Debug, Clone)]
69pub struct StreamMetadata {
70 /// The `(app, stream_id)` this metadata describes.
71 pub key: StreamKey,
72 /// Publisher remote address.
73 pub publisher_addr: Option<SocketAddr>,
74 /// Video width in pixels (0 = unknown).
75 pub width: u32,
76 /// Video height in pixels (0 = unknown).
77 pub height: u32,
78 /// Video frames per second (0 = unknown). Overlaid from measured throughput.
79 pub fps: f64,
80 /// Measured video ingest bitrate in bits-per-second.
81 pub video_bitrate_bps: u64,
82 /// Measured audio ingest bitrate in bits-per-second.
83 pub audio_bitrate_bps: u64,
84 /// Timestamp of the first frame received (Unix ms).
85 pub started_at_ms: u64,
86 /// Protocol used for ingest (e.g. `"rtmp"`).
87 pub ingest_protocol: String,
88}
89
90impl StreamMetadata {
91 /// Create zeroed metadata for `(app, stream_id)`.
92 pub fn new(app: AppName, stream_id: StreamId) -> Self {
93 Self {
94 key: StreamKey::new(app, stream_id),
95 publisher_addr: None,
96 width: 0,
97 height: 0,
98 fps: 0.0,
99 video_bitrate_bps: 0,
100 audio_bitrate_bps: 0,
101 started_at_ms: 0,
102 ingest_protocol: String::new(),
103 }
104 }
105}
106
107/// A point-in-time snapshot of a stream's measured quality of service.
108#[derive(Debug, Clone, Copy, Default)]
109pub struct Qos {
110 /// Video bitrate over the last ~1s window (bits/sec).
111 pub video_bitrate_bps: u64,
112 /// Audio bitrate over the last ~1s window (bits/sec).
113 pub audio_bitrate_bps: u64,
114 /// Video frames per second over the last ~1s window.
115 pub fps: f64,
116 /// Cumulative frames published on this stream.
117 pub total_frames: u64,
118 /// Cumulative payload bytes published on this stream.
119 pub total_bytes: u64,
120}
121
122/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
123///
124/// A stream has a single publisher (enforced by `start_publish`), so the
125/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
126#[derive(Default)]
127struct QosCounters {
128 total_frames: AtomicU64,
129 total_bytes: AtomicU64,
130 window_start_ms: AtomicU64,
131 window_video_bytes: AtomicU64,
132 window_audio_bytes: AtomicU64,
133 window_video_frames: AtomicU64,
134 cur_video_bitrate: AtomicU64,
135 cur_audio_bitrate: AtomicU64,
136 cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
137 last_frame_ms: AtomicU64,
138}
139
140/// Keyframe-anchored replay buffer for instant playback start.
141struct GopBuffer {
142 /// Frames since (and including) the most recent keyframe.
143 frames: Vec<Arc<MediaFrame>>,
144 /// Hard cap on buffered frames (memory bound between keyframes).
145 capacity: usize,
146}
147
148/// A live handle to a single active stream.
149///
150/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
151/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
152/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
153///
154/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
155/// 4096 slots ≈ 32 KB per stream.
156///
157/// # Backpressure model
158///
159/// Fan-out uses a **single, fixed-capacity ring buffer per stream** (a
160/// `tokio::broadcast` channel sized by `AppSpec::broadcast_capacity`). This is a
161/// deliberate design choice, with consequences worth understanding:
162///
163/// - **The publisher never blocks on a slow subscriber.** Publishing is a
164/// non-awaiting pointer write; one subscriber falling behind can never apply
165/// backpressure to the publisher or to its peers. This is what keeps the hot
166/// path lock-free and the fast publisher isolated from the slow viewer.
167/// - **Backpressure is resolved by dropping, not stalling.** A subscriber that
168/// can't keep up overruns the ring and observes lag.
169/// [`Subscription::recv`] resynchronizes to the oldest still-buffered frame
170/// and reports the gap via [`Observer::on_subscriber_lagged`]; with
171/// [`Subscription::max_lag`] a chronically slow consumer is evicted
172/// ([`Observer::on_subscriber_evicted`]) rather than churning forever.
173/// - **Capacity is the tuning knob**, traded per stream: larger capacity
174/// tolerates burstier consumers at higher per-stream memory, smaller capacity
175/// sheds laggards sooner. There is intentionally no per-subscriber queue —
176/// that would reintroduce unbounded memory growth and per-consumer locking,
177/// the very things this design avoids.
178///
179/// In short: a slow subscriber degrades only *its own* view (lag, then
180/// eviction), never the publisher's or another subscriber's. Wire an
181/// [`Observer`] to see lag and eviction as they happen.
182#[derive(Clone)]
183pub struct StreamHandle {
184 metadata: Arc<RwLock<StreamMetadata>>,
185 state: Arc<RwLock<StreamState>>,
186 key: StreamKey,
187 /// The frame-bus sender, held *indirectly* so its lifetime is owned by the
188 /// stream's lifecycle — not by how many `StreamHandle` clones happen to be
189 /// alive. Cloning a handle shares this cell; it does not mint a new sender.
190 ///
191 /// This is the structural fix for a sharp edge: previously the handle stored
192 /// the `broadcast::Sender` by value, so any consumer that merely retained a
193 /// handle (to subscribe, read metadata, request a keyframe) silently pinned
194 /// the channel open and defeated the `Closed` shutdown signal. Now
195 /// [`close`](Self::close) — called by the registry when a publish ends — empties
196 /// this cell, dropping the sole sender and closing the channel regardless of
197 /// any lingering handle clones.
198 tx: Arc<ArcSwapOption<broadcast::Sender<Arc<MediaFrame>>>>,
199 /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
200 /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
201 video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
202 /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
203 audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
204 /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
205 gop: Arc<StdMutex<GopBuffer>>,
206 gop_capacity: usize,
207 /// Live throughput counters.
208 qos: Arc<QosCounters>,
209 /// Injected telemetry hook (no-op by default).
210 observer: Arc<dyn Observer>,
211}
212
213impl StreamHandle {
214 /// Create a handle with the no-op observer and no GOP cache.
215 pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
216 Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
217 }
218
219 /// Create a handle wired to a host-supplied observer.
220 ///
221 /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
222 pub fn with_observer(
223 app: AppName,
224 stream_id: StreamId,
225 capacity: usize,
226 gop_capacity: usize,
227 observer: Arc<dyn Observer>,
228 ) -> Self {
229 let (tx, _) = broadcast::channel(capacity);
230 let qos = QosCounters::default();
231 // Treat creation as the last activity so a just-claimed stream is not
232 // instantly considered idle before its first frame arrives. Monotonic, to
233 // match the idle reaper (see `mono_ms`).
234 qos.last_frame_ms.store(mono_ms(), Ordering::Relaxed);
235 Self {
236 metadata: Arc::new(RwLock::new(StreamMetadata::new(
237 app.clone(),
238 stream_id.clone(),
239 ))),
240 state: Arc::new(RwLock::new(StreamState::Idle)),
241 key: StreamKey::new(app, stream_id),
242 tx: Arc::new(ArcSwapOption::new(Some(Arc::new(tx)))),
243 video_config: Arc::new(ArcSwap::new(Arc::new(None))),
244 audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
245 gop: Arc::new(StdMutex::new(GopBuffer {
246 frames: Vec::new(),
247 capacity: gop_capacity,
248 })),
249 gop_capacity,
250 qos: Arc::new(qos),
251 observer,
252 }
253 }
254
255 /// The `(app, stream_id)` this handle belongs to.
256 pub fn key(&self) -> &StreamKey {
257 &self.key
258 }
259
260 /// Publish a frame to all current subscribers. Returns the number of
261 /// active receivers; returns `Ok(0)` when there are no subscribers.
262 pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
263 self.observer.on_frame(&self.key, &frame);
264 let len = frame.data.len() as u64;
265 let is_audio = frame.is_audio();
266 let is_key = frame.is_keyframe();
267 let is_config = frame.flags.contains(FrameFlags::CONFIG);
268 let arc = Arc::new(frame);
269
270 // Cache the latest CONFIG frame for late-joining subscribers.
271 if is_config {
272 if is_audio {
273 self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
274 } else {
275 self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
276 }
277 }
278
279 // Maintain the keyframe-anchored GOP replay buffer.
280 if self.gop_capacity > 0 {
281 if let Ok(mut g) = self.gop.lock() {
282 if is_key {
283 g.frames.clear();
284 g.frames.push(Arc::clone(&arc));
285 } else if !is_config && !g.frames.is_empty() && g.frames.len() < g.capacity {
286 // Only buffer once a keyframe anchors the GOP; CONFIG frames
287 // are replayed separately via `cached_configs`.
288 g.frames.push(Arc::clone(&arc));
289 }
290 }
291 }
292
293 self.record_qos(len, is_audio, is_key);
294
295 // The sender is gone once the stream is closed; publishing then is a
296 // no-op (Ok(0)) rather than an error, so a publisher racing teardown
297 // winds down cleanly.
298 let count = match self.tx.load_full() {
299 Some(tx) => tx.send(arc).unwrap_or(0),
300 None => 0,
301 };
302 Ok(count)
303 }
304
305 /// Fold one frame into the rolling throughput window.
306 fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
307 let q = &self.qos;
308 let now = mono_ms();
309 q.total_frames.fetch_add(1, Ordering::Relaxed);
310 q.total_bytes.fetch_add(len, Ordering::Relaxed);
311 q.last_frame_ms.store(now, Ordering::Relaxed);
312 if is_audio {
313 q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
314 } else {
315 q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
316 q.window_video_frames.fetch_add(1, Ordering::Relaxed);
317 }
318
319 let ws = q.window_start_ms.load(Ordering::Relaxed);
320 if ws == 0 {
321 q.window_start_ms.store(now, Ordering::Relaxed);
322 } else if now.saturating_sub(ws) >= 1000 {
323 let elapsed = (now - ws) as f64 / 1000.0;
324 let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
325 let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
326 let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
327 q.cur_video_bitrate
328 .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
329 q.cur_audio_bitrate
330 .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
331 q.cur_fps_milli.store(
332 (vframes as f64 / elapsed * 1000.0) as u64,
333 Ordering::Relaxed,
334 );
335 q.window_start_ms.store(now, Ordering::Relaxed);
336 }
337 }
338
339 /// A snapshot of measured throughput (bitrate, fps, totals).
340 pub fn qos(&self) -> Qos {
341 let q = &self.qos;
342 Qos {
343 video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
344 audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
345 fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
346 total_frames: q.total_frames.load(Ordering::Relaxed),
347 total_bytes: q.total_bytes.load(Ordering::Relaxed),
348 }
349 }
350
351 /// Monotonic-clock timestamp (process-local milliseconds) of the most
352 /// recently published frame (or stream creation if none yet). Used by the
353 /// engine's idle reaper; this is elapsed monotonic time, not wall-clock time,
354 /// so compare it only against other readings of the same monotonic clock.
355 pub fn last_frame_ms(&self) -> u64 {
356 self.qos.last_frame_ms.load(Ordering::Relaxed)
357 }
358
359 /// Returns the most recently seen video and audio CONFIG frames,
360 /// for replaying to late-joining subscribers.
361 pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
362 let video = (**self.video_config.load()).clone();
363 let audio = (**self.audio_config.load()).clone();
364 (video, audio)
365 }
366
367 /// The frames a late joiner should be handed before going live: cached
368 /// decoder configs followed by the current GOP (keyframe + trailing deltas).
369 ///
370 /// Replaying these lets a new subscriber start decoding immediately rather
371 /// than waiting for the next keyframe — sub-second join times at scale.
372 /// Requires the app to have enabled a GOP cache; otherwise only the cached
373 /// configs are returned.
374 pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
375 let (vcfg, acfg) = self.cached_configs();
376 let mut out = Vec::new();
377 out.extend(vcfg);
378 out.extend(acfg);
379 if self.gop_capacity > 0 {
380 if let Ok(g) = self.gop.lock() {
381 for f in &g.frames {
382 // Avoid duplicating a config frame already pushed above.
383 if !out.iter().any(|c| Arc::ptr_eq(c, f)) {
384 out.push(Arc::clone(f));
385 }
386 }
387 }
388 }
389 out
390 }
391
392 /// Subscribe to this stream's frame bus.
393 ///
394 /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
395 /// when a slow consumer falls behind the channel capacity — callers that
396 /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
397 /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
398 /// are deliberately handling lag yourself.
399 pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
400 match self.tx.load_full() {
401 Some(tx) => tx.subscribe(),
402 // The stream has already closed: hand back a receiver on a spent
403 // channel so the caller's `recv` loop terminates immediately with
404 // `Closed` rather than blocking forever.
405 None => {
406 let (_, rx) = broadcast::channel(1);
407 rx
408 }
409 }
410 }
411
412 /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
413 /// of terminating, reporting each gap to the installed [`Observer`] via
414 /// [`Observer::on_subscriber_lagged`].
415 pub fn subscribe_resilient(&self) -> Subscription {
416 Subscription {
417 rx: self.subscribe(),
418 key: self.key.clone(),
419 observer: Arc::clone(&self.observer),
420 max_lag: None,
421 skipped: 0,
422 }
423 }
424
425 /// Number of active subscribers (0 once the stream is closed).
426 pub fn subscriber_count(&self) -> usize {
427 self.tx
428 .load_full()
429 .map(|tx| tx.receiver_count())
430 .unwrap_or(0)
431 }
432
433 /// Close the frame bus: drop the sole sender so every subscriber's `recv`
434 /// observes `Closed` and terminates, *regardless* of how many `StreamHandle`
435 /// clones are still alive.
436 ///
437 /// Called by the registry when a publish ends (see
438 /// `Application::end_publish`). Idempotent. This is what makes the channel's
439 /// lifetime track the stream's lifecycle rather than handle reachability.
440 pub fn close(&self) {
441 self.tx.store(None);
442 }
443
444 /// Transition to a new state.
445 pub async fn set_state(&self, state: StreamState) {
446 let mut guard = self.state.write().await;
447 *guard = state;
448 }
449
450 /// The current lifecycle state.
451 pub async fn current_state(&self) -> StreamState {
452 self.state.read().await.clone()
453 }
454
455 /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
456 /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
457 ///
458 /// Cloning the snapshot releases the lock immediately, so callers never hold
459 /// the metadata `RwLock` across an `.await`.
460 pub async fn metadata_snapshot(&self) -> StreamMetadata {
461 let mut m = self.metadata.read().await.clone();
462 let q = self.qos();
463 m.video_bitrate_bps = q.video_bitrate_bps;
464 m.audio_bitrate_bps = q.audio_bitrate_bps;
465 if q.fps > 0.0 {
466 m.fps = q.fps;
467 }
468 m
469 }
470
471 /// Mutate this stream's [`StreamMetadata`] under the write lock.
472 ///
473 /// Ingest handlers call this as they parse the stream — e.g. on the first
474 /// keyframe to record resolution from the codec config, or to set the
475 /// publisher address — so the metadata exposed to operators and the control
476 /// plane stays live rather than frozen at its zeroed defaults.
477 ///
478 /// ```no_run
479 /// # use arcly_stream::StreamHandle;
480 /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
481 /// handle
482 /// .update_metadata(|m| {
483 /// m.publisher_addr = Some(addr);
484 /// m.width = 1920;
485 /// m.height = 1080;
486 /// m.ingest_protocol = "rtmp".to_string();
487 /// })
488 /// .await;
489 /// # }
490 /// ```
491 pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
492 let mut guard = self.metadata.write().await;
493 f(&mut guard);
494 }
495}
496
497impl std::fmt::Debug for StreamHandle {
498 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499 f.debug_struct("StreamHandle")
500 .field("key", &self.key)
501 .field("subscribers", &self.subscriber_count())
502 .finish()
503 }
504}
505
506/// A lag-tolerant subscription to a stream's frame bus.
507///
508/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
509/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
510/// consumer falls behind: the dropped span is reported to the [`Observer`] as
511/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
512/// continues from the oldest still-buffered frame. This is the recommended
513/// consumer loop for packagers, recorders, and SFUs.
514///
515/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
516/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
517/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
518/// notification) so a hopelessly slow consumer is shed rather than wasting
519/// buffer churn forever.
520pub struct Subscription {
521 rx: broadcast::Receiver<Arc<MediaFrame>>,
522 key: StreamKey,
523 observer: Arc<dyn Observer>,
524 max_lag: Option<u64>,
525 skipped: u64,
526}
527
528impl Subscription {
529 /// Evict this subscriber once cumulative dropped frames exceed `max`.
530 ///
531 /// ```no_run
532 /// # use arcly_stream::StreamHandle;
533 /// # fn demo(handle: &StreamHandle) {
534 /// let sub = handle.subscribe_resilient().max_lag(10_000);
535 /// # let _ = sub;
536 /// # }
537 /// ```
538 pub fn max_lag(mut self, max: u64) -> Self {
539 self.max_lag = Some(max);
540 self
541 }
542
543 /// Total frames dropped from this subscriber's view so far.
544 pub fn dropped(&self) -> u64 {
545 self.skipped
546 }
547
548 /// Receive the next frame, resynchronizing past any lag.
549 ///
550 /// Returns `None` when the stream's sender is dropped (the publisher ended)
551 /// or when the `max_lag` eviction threshold is crossed:
552 ///
553 /// ```no_run
554 /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
555 /// while let Some(frame) = sub.recv().await {
556 /// // packetize `frame` …
557 /// }
558 /// # }
559 /// ```
560 pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
561 loop {
562 match self.rx.recv().await {
563 Ok(frame) => return Some(frame),
564 Err(RecvError::Lagged(skipped)) => {
565 self.skipped = self.skipped.saturating_add(skipped);
566 self.observer.on_subscriber_lagged(&self.key, skipped);
567 if let Some(max) = self.max_lag {
568 if self.skipped > max {
569 self.observer.on_subscriber_evicted(&self.key);
570 return None;
571 }
572 }
573 continue;
574 }
575 Err(RecvError::Closed) => return None,
576 }
577 }
578 }
579
580 /// Borrow the underlying raw receiver, for callers that need
581 /// [`broadcast::Receiver`] APIs directly.
582 pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
583 &mut self.rx
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590 use crate::CodecId;
591
592 fn video(pts: i64, key: bool) -> MediaFrame {
593 MediaFrame::new_video(
594 pts,
595 pts,
596 bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65]),
597 CodecId::H264,
598 key,
599 )
600 }
601
602 /// Regression guard for the sender-lifetime fix: `close()` must terminate a
603 /// subscriber's `recv` even while a `StreamHandle` clone is still held — the
604 /// exact shape that previously hung WHEP egress forever.
605 #[tokio::test]
606 async fn close_terminates_recv_while_a_handle_clone_is_held() {
607 let handle = StreamHandle::new("live".into(), "show".into(), 16);
608 let mut sub = handle.subscribe_resilient();
609
610 // A retained clone (e.g. an egress pump) must NOT pin the channel open.
611 let retained = handle.clone();
612 handle.publish_frame(video(0, true)).unwrap();
613 assert!(sub.recv().await.is_some(), "frame delivered before close");
614
615 retained.close();
616
617 // Without the registry-owned sender this would block forever; bound it so
618 // a regression fails loudly instead of hanging the suite.
619 let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
620 .await
621 .expect("recv resolved after close (no hang)");
622 assert!(got.is_none(), "recv returns None once the stream is closed");
623
624 // Publishing post-close is a clean no-op, not a panic.
625 assert_eq!(retained.publish_frame(video(1, false)).unwrap(), 0);
626 assert_eq!(retained.subscriber_count(), 0);
627 }
628
629 /// `mono_ms` is monotonic and drives `last_frame_ms`, so the idle reaper's
630 /// elapsed-time math is independent of the wall clock.
631 #[test]
632 fn mono_clock_is_monotonic_and_drives_last_frame() {
633 let a = mono_ms();
634 let b = mono_ms();
635 assert!(b >= a, "monotonic clock never goes backward");
636
637 let handle = StreamHandle::new("live".into(), "m".into(), 4);
638 let before = mono_ms();
639 handle.publish_frame(video(0, true)).unwrap();
640 assert!(
641 handle.last_frame_ms() >= before,
642 "last_frame_ms advances on the monotonic clock"
643 );
644 }
645}