Skip to main content

arcly_stream/bus/
handle.rs

1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, StreamId, StreamKey};
5use arc_swap::{ArcSwap, ArcSwapOption};
6use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex, OnceLock};
9use std::time::{Instant, SystemTime, UNIX_EPOCH};
10use tokio::sync::broadcast::error::RecvError;
11use tokio::sync::{broadcast, RwLock};
12
13/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
14///
15/// Use this only for *displayed* timestamps (e.g. `started_at_ms`); it is
16/// subject to NTP steps and operator clock changes, so it must never drive
17/// elapsed-time decisions. For those, use [`mono_ms`].
18pub(crate) fn now_ms() -> u64 {
19    SystemTime::now()
20        .duration_since(UNIX_EPOCH)
21        .map(|d| d.as_millis() as u64)
22        .unwrap_or(0)
23}
24
25/// Milliseconds elapsed on a process-local **monotonic** clock since the first
26/// call. Unlike [`now_ms`], this never jumps backward (or forward) when the wall
27/// clock is adjusted, so it is the correct basis for QoS windows and the idle
28/// reaper: a leap-second or NTP correction can't spuriously reap a live stream
29/// or distort a measured bitrate.
30pub(crate) fn mono_ms() -> u64 {
31    static EPOCH: OnceLock<Instant> = OnceLock::new();
32    EPOCH.get_or_init(Instant::now).elapsed().as_millis() as u64
33}
34
35/// Current lifecycle state of a stream.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum StreamState {
38    /// No publisher yet.
39    Idle,
40    /// A publisher is connected and sending data.
41    Publishing,
42    /// The stream is being transcoded into one or more renditions.
43    Transcoding,
44    /// The stream is being recorded.
45    Recording,
46    /// The publisher has disconnected; the stream has ended.
47    Ended,
48}
49
50impl std::fmt::Display for StreamState {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.write_str(match self {
53            StreamState::Idle => "idle",
54            StreamState::Publishing => "publishing",
55            StreamState::Transcoding => "transcoding",
56            StreamState::Recording => "recording",
57            StreamState::Ended => "ended",
58        })
59    }
60}
61
62/// Runtime metadata about a stream, updated continuously while publishing.
63///
64/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
65/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
66/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
67/// by [`StreamHandle::metadata_snapshot`].
68#[derive(Debug, Clone)]
69pub struct StreamMetadata {
70    /// The `(app, stream_id)` this metadata describes.
71    pub key: StreamKey,
72    /// Publisher remote address.
73    pub publisher_addr: Option<SocketAddr>,
74    /// Video width in pixels (0 = unknown).
75    pub width: u32,
76    /// Video height in pixels (0 = unknown).
77    pub height: u32,
78    /// Video frames per second (0 = unknown). Overlaid from measured throughput.
79    pub fps: f64,
80    /// Measured video ingest bitrate in bits-per-second.
81    pub video_bitrate_bps: u64,
82    /// Measured audio ingest bitrate in bits-per-second.
83    pub audio_bitrate_bps: u64,
84    /// Timestamp of the first frame received (Unix ms).
85    pub started_at_ms: u64,
86    /// Protocol used for ingest (e.g. `"rtmp"`).
87    pub ingest_protocol: String,
88}
89
90impl StreamMetadata {
91    /// Create zeroed metadata for `(app, stream_id)`.
92    pub fn new(app: AppName, stream_id: StreamId) -> Self {
93        Self {
94            key: StreamKey::new(app, stream_id),
95            publisher_addr: None,
96            width: 0,
97            height: 0,
98            fps: 0.0,
99            video_bitrate_bps: 0,
100            audio_bitrate_bps: 0,
101            started_at_ms: 0,
102            ingest_protocol: String::new(),
103        }
104    }
105}
106
107/// A point-in-time snapshot of a stream's measured quality of service.
108#[derive(Debug, Clone, Copy, Default)]
109pub struct Qos {
110    /// Video bitrate over the last ~1s window (bits/sec).
111    pub video_bitrate_bps: u64,
112    /// Audio bitrate over the last ~1s window (bits/sec).
113    pub audio_bitrate_bps: u64,
114    /// Video frames per second over the last ~1s window.
115    pub fps: f64,
116    /// Cumulative frames published on this stream.
117    pub total_frames: u64,
118    /// Cumulative payload bytes published on this stream.
119    pub total_bytes: u64,
120}
121
122/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
123///
124/// A stream has a single publisher (enforced by `start_publish`), so the
125/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
126#[derive(Default)]
127struct QosCounters {
128    total_frames: AtomicU64,
129    total_bytes: AtomicU64,
130    window_start_ms: AtomicU64,
131    window_video_bytes: AtomicU64,
132    window_audio_bytes: AtomicU64,
133    window_video_frames: AtomicU64,
134    cur_video_bitrate: AtomicU64,
135    cur_audio_bitrate: AtomicU64,
136    cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
137    last_frame_ms: AtomicU64,
138}
139
140/// Keyframe-anchored replay buffer for instant playback start.
141struct GopBuffer {
142    /// Frames since (and including) the most recent keyframe.
143    frames: Vec<Arc<MediaFrame>>,
144    /// Hard cap on buffered frames (memory bound between keyframes).
145    capacity: usize,
146}
147
148/// A live handle to a single active stream.
149///
150/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
151/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
152/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
153///
154/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
155/// 4096 slots ≈ 32 KB per stream.
156///
157/// # Backpressure model
158///
159/// Fan-out uses a **single, fixed-capacity ring buffer per stream** (a
160/// `tokio::broadcast` channel sized by `AppSpec::broadcast_capacity`). This is a
161/// deliberate design choice, with consequences worth understanding:
162///
163/// - **The publisher never blocks on a slow subscriber.** Publishing is a
164///   non-awaiting pointer write; one subscriber falling behind can never apply
165///   backpressure to the publisher or to its peers. This is what keeps the hot
166///   path lock-free and the fast publisher isolated from the slow viewer.
167/// - **Backpressure is resolved by dropping, not stalling.** A subscriber that
168///   can't keep up overruns the ring and observes lag.
169///   [`Subscription::recv`] resynchronizes to the oldest still-buffered frame
170///   and reports the gap via [`Observer::on_subscriber_lagged`]; with
171///   [`Subscription::max_lag`] a chronically slow consumer is evicted
172///   ([`Observer::on_subscriber_evicted`]) rather than churning forever.
173/// - **Capacity is the tuning knob**, traded per stream: larger capacity
174///   tolerates burstier consumers at higher per-stream memory, smaller capacity
175///   sheds laggards sooner. There is intentionally no per-subscriber queue —
176///   that would reintroduce unbounded memory growth and per-consumer locking,
177///   the very things this design avoids.
178///
179/// In short: a slow subscriber degrades only *its own* view (lag, then
180/// eviction), never the publisher's or another subscriber's. Wire an
181/// [`Observer`] to see lag and eviction as they happen.
182#[derive(Clone)]
183pub struct StreamHandle {
184    metadata: Arc<RwLock<StreamMetadata>>,
185    state: Arc<RwLock<StreamState>>,
186    key: StreamKey,
187    /// The frame-bus sender, held *indirectly* so its lifetime is owned by the
188    /// stream's lifecycle — not by how many `StreamHandle` clones happen to be
189    /// alive. Cloning a handle shares this cell; it does not mint a new sender.
190    ///
191    /// This is the structural fix for a sharp edge: previously the handle stored
192    /// the `broadcast::Sender` by value, so any consumer that merely retained a
193    /// handle (to subscribe, read metadata, request a keyframe) silently pinned
194    /// the channel open and defeated the `Closed` shutdown signal. Now
195    /// [`close`](Self::close) — called by the registry when a publish ends — empties
196    /// this cell, dropping the sole sender and closing the channel regardless of
197    /// any lingering handle clones.
198    tx: Arc<ArcSwapOption<broadcast::Sender<Arc<MediaFrame>>>>,
199    /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
200    /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
201    video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
202    /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
203    audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
204    /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
205    gop: Arc<StdMutex<GopBuffer>>,
206    gop_capacity: usize,
207    /// Live throughput counters.
208    qos: Arc<QosCounters>,
209    /// Injected telemetry hook (no-op by default).
210    observer: Arc<dyn Observer>,
211}
212
213impl StreamHandle {
214    /// Create a handle with the no-op observer and no GOP cache.
215    pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
216        Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
217    }
218
219    /// Create a handle wired to a host-supplied observer.
220    ///
221    /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
222    pub fn with_observer(
223        app: AppName,
224        stream_id: StreamId,
225        capacity: usize,
226        gop_capacity: usize,
227        observer: Arc<dyn Observer>,
228    ) -> Self {
229        let (tx, _) = broadcast::channel(capacity);
230        let qos = QosCounters::default();
231        // Treat creation as the last activity so a just-claimed stream is not
232        // instantly considered idle before its first frame arrives. Monotonic, to
233        // match the idle reaper (see `mono_ms`).
234        qos.last_frame_ms.store(mono_ms(), Ordering::Relaxed);
235        Self {
236            metadata: Arc::new(RwLock::new(StreamMetadata::new(
237                app.clone(),
238                stream_id.clone(),
239            ))),
240            state: Arc::new(RwLock::new(StreamState::Idle)),
241            key: StreamKey::new(app, stream_id),
242            tx: Arc::new(ArcSwapOption::new(Some(Arc::new(tx)))),
243            video_config: Arc::new(ArcSwap::new(Arc::new(None))),
244            audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
245            gop: Arc::new(StdMutex::new(GopBuffer {
246                frames: Vec::new(),
247                capacity: gop_capacity,
248            })),
249            gop_capacity,
250            qos: Arc::new(qos),
251            observer,
252        }
253    }
254
255    /// The `(app, stream_id)` this handle belongs to.
256    pub fn key(&self) -> &StreamKey {
257        &self.key
258    }
259
260    /// Publish a frame to all current subscribers.  Returns the number of
261    /// active receivers; returns `Ok(0)` when there are no subscribers.
262    pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
263        self.observer.on_frame(&self.key, &frame);
264        let len = frame.data.len() as u64;
265        let is_audio = frame.is_audio();
266        let is_key = frame.is_keyframe();
267        let is_config = frame.flags.contains(FrameFlags::CONFIG);
268        let arc = Arc::new(frame);
269
270        // Cache the latest CONFIG frame for late-joining subscribers.
271        if is_config {
272            if is_audio {
273                self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
274            } else {
275                self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
276            }
277        }
278
279        // Maintain the keyframe-anchored GOP replay buffer.
280        if self.gop_capacity > 0 {
281            if let Ok(mut g) = self.gop.lock() {
282                if is_key {
283                    g.frames.clear();
284                    g.frames.push(Arc::clone(&arc));
285                } else if !is_config && !g.frames.is_empty() && g.frames.len() < g.capacity {
286                    // Only buffer once a keyframe anchors the GOP; CONFIG frames
287                    // are replayed separately via `cached_configs`.
288                    g.frames.push(Arc::clone(&arc));
289                }
290            }
291        }
292
293        self.record_qos(len, is_audio, is_key);
294
295        // The sender is gone once the stream is closed; publishing then is a
296        // no-op (Ok(0)) rather than an error, so a publisher racing teardown
297        // winds down cleanly.
298        let count = match self.tx.load_full() {
299            Some(tx) => tx.send(arc).unwrap_or(0),
300            None => 0,
301        };
302        Ok(count)
303    }
304
305    /// Fold one frame into the rolling throughput window.
306    fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
307        let q = &self.qos;
308        let now = mono_ms();
309        q.total_frames.fetch_add(1, Ordering::Relaxed);
310        q.total_bytes.fetch_add(len, Ordering::Relaxed);
311        q.last_frame_ms.store(now, Ordering::Relaxed);
312        if is_audio {
313            q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
314        } else {
315            q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
316            q.window_video_frames.fetch_add(1, Ordering::Relaxed);
317        }
318
319        let ws = q.window_start_ms.load(Ordering::Relaxed);
320        if ws == 0 {
321            q.window_start_ms.store(now, Ordering::Relaxed);
322        } else if now.saturating_sub(ws) >= 1000 {
323            let elapsed = (now - ws) as f64 / 1000.0;
324            let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
325            let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
326            let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
327            q.cur_video_bitrate
328                .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
329            q.cur_audio_bitrate
330                .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
331            q.cur_fps_milli.store(
332                (vframes as f64 / elapsed * 1000.0) as u64,
333                Ordering::Relaxed,
334            );
335            q.window_start_ms.store(now, Ordering::Relaxed);
336        }
337    }
338
339    /// A snapshot of measured throughput (bitrate, fps, totals).
340    pub fn qos(&self) -> Qos {
341        let q = &self.qos;
342        Qos {
343            video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
344            audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
345            fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
346            total_frames: q.total_frames.load(Ordering::Relaxed),
347            total_bytes: q.total_bytes.load(Ordering::Relaxed),
348        }
349    }
350
351    /// Monotonic-clock timestamp (process-local milliseconds) of the most
352    /// recently published frame (or stream creation if none yet). Used by the
353    /// engine's idle reaper; this is elapsed monotonic time, not wall-clock time,
354    /// so compare it only against other readings of the same monotonic clock.
355    pub fn last_frame_ms(&self) -> u64 {
356        self.qos.last_frame_ms.load(Ordering::Relaxed)
357    }
358
359    /// Returns the most recently seen video and audio CONFIG frames,
360    /// for replaying to late-joining subscribers.
361    pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
362        let video = (**self.video_config.load()).clone();
363        let audio = (**self.audio_config.load()).clone();
364        (video, audio)
365    }
366
367    /// The frames a late joiner should be handed before going live: cached
368    /// decoder configs followed by the current GOP (keyframe + trailing deltas).
369    ///
370    /// Replaying these lets a new subscriber start decoding immediately rather
371    /// than waiting for the next keyframe — sub-second join times at scale.
372    /// Requires the app to have enabled a GOP cache; otherwise only the cached
373    /// configs are returned.
374    pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
375        let (vcfg, acfg) = self.cached_configs();
376        let mut out = Vec::new();
377        out.extend(vcfg);
378        out.extend(acfg);
379        if self.gop_capacity > 0 {
380            if let Ok(g) = self.gop.lock() {
381                for f in &g.frames {
382                    // Avoid duplicating a config frame already pushed above.
383                    if !out.iter().any(|c| Arc::ptr_eq(c, f)) {
384                        out.push(Arc::clone(f));
385                    }
386                }
387            }
388        }
389        out
390    }
391
392    /// Subscribe to this stream's frame bus.
393    ///
394    /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
395    /// when a slow consumer falls behind the channel capacity — callers that
396    /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
397    /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
398    /// are deliberately handling lag yourself.
399    pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
400        match self.tx.load_full() {
401            Some(tx) => tx.subscribe(),
402            // The stream has already closed: hand back a receiver on a spent
403            // channel so the caller's `recv` loop terminates immediately with
404            // `Closed` rather than blocking forever.
405            None => {
406                let (_, rx) = broadcast::channel(1);
407                rx
408            }
409        }
410    }
411
412    /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
413    /// of terminating, reporting each gap to the installed [`Observer`] via
414    /// [`Observer::on_subscriber_lagged`].
415    pub fn subscribe_resilient(&self) -> Subscription {
416        Subscription {
417            rx: self.subscribe(),
418            key: self.key.clone(),
419            observer: Arc::clone(&self.observer),
420            max_lag: None,
421            skipped: 0,
422        }
423    }
424
425    /// Number of active subscribers (0 once the stream is closed).
426    pub fn subscriber_count(&self) -> usize {
427        self.tx
428            .load_full()
429            .map(|tx| tx.receiver_count())
430            .unwrap_or(0)
431    }
432
433    /// Close the frame bus: drop the sole sender so every subscriber's `recv`
434    /// observes `Closed` and terminates, *regardless* of how many `StreamHandle`
435    /// clones are still alive.
436    ///
437    /// Called by the registry when a publish ends (see
438    /// `Application::end_publish`). Idempotent. This is what makes the channel's
439    /// lifetime track the stream's lifecycle rather than handle reachability.
440    pub fn close(&self) {
441        self.tx.store(None);
442    }
443
444    /// Transition to a new state.
445    pub async fn set_state(&self, state: StreamState) {
446        let mut guard = self.state.write().await;
447        *guard = state;
448    }
449
450    /// The current lifecycle state.
451    pub async fn current_state(&self) -> StreamState {
452        self.state.read().await.clone()
453    }
454
455    /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
456    /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
457    ///
458    /// Cloning the snapshot releases the lock immediately, so callers never hold
459    /// the metadata `RwLock` across an `.await`.
460    pub async fn metadata_snapshot(&self) -> StreamMetadata {
461        let mut m = self.metadata.read().await.clone();
462        let q = self.qos();
463        m.video_bitrate_bps = q.video_bitrate_bps;
464        m.audio_bitrate_bps = q.audio_bitrate_bps;
465        if q.fps > 0.0 {
466            m.fps = q.fps;
467        }
468        m
469    }
470
471    /// Mutate this stream's [`StreamMetadata`] under the write lock.
472    ///
473    /// Ingest handlers call this as they parse the stream — e.g. on the first
474    /// keyframe to record resolution from the codec config, or to set the
475    /// publisher address — so the metadata exposed to operators and the control
476    /// plane stays live rather than frozen at its zeroed defaults.
477    ///
478    /// ```no_run
479    /// # use arcly_stream::StreamHandle;
480    /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
481    /// handle
482    ///     .update_metadata(|m| {
483    ///         m.publisher_addr = Some(addr);
484    ///         m.width = 1920;
485    ///         m.height = 1080;
486    ///         m.ingest_protocol = "rtmp".to_string();
487    ///     })
488    ///     .await;
489    /// # }
490    /// ```
491    pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
492        let mut guard = self.metadata.write().await;
493        f(&mut guard);
494    }
495}
496
497impl std::fmt::Debug for StreamHandle {
498    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
499        f.debug_struct("StreamHandle")
500            .field("key", &self.key)
501            .field("subscribers", &self.subscriber_count())
502            .finish()
503    }
504}
505
506/// A lag-tolerant subscription to a stream's frame bus.
507///
508/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
509/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
510/// consumer falls behind: the dropped span is reported to the [`Observer`] as
511/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
512/// continues from the oldest still-buffered frame. This is the recommended
513/// consumer loop for packagers, recorders, and SFUs.
514///
515/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
516/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
517/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
518/// notification) so a hopelessly slow consumer is shed rather than wasting
519/// buffer churn forever.
520pub struct Subscription {
521    rx: broadcast::Receiver<Arc<MediaFrame>>,
522    key: StreamKey,
523    observer: Arc<dyn Observer>,
524    max_lag: Option<u64>,
525    skipped: u64,
526}
527
528impl Subscription {
529    /// Evict this subscriber once cumulative dropped frames exceed `max`.
530    ///
531    /// ```no_run
532    /// # use arcly_stream::StreamHandle;
533    /// # fn demo(handle: &StreamHandle) {
534    /// let sub = handle.subscribe_resilient().max_lag(10_000);
535    /// # let _ = sub;
536    /// # }
537    /// ```
538    pub fn max_lag(mut self, max: u64) -> Self {
539        self.max_lag = Some(max);
540        self
541    }
542
543    /// Total frames dropped from this subscriber's view so far.
544    pub fn dropped(&self) -> u64 {
545        self.skipped
546    }
547
548    /// Receive the next frame, resynchronizing past any lag.
549    ///
550    /// Returns `None` when the stream's sender is dropped (the publisher ended)
551    /// or when the `max_lag` eviction threshold is crossed:
552    ///
553    /// ```no_run
554    /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
555    /// while let Some(frame) = sub.recv().await {
556    ///     // packetize `frame` …
557    /// }
558    /// # }
559    /// ```
560    pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
561        loop {
562            match self.rx.recv().await {
563                Ok(frame) => return Some(frame),
564                Err(RecvError::Lagged(skipped)) => {
565                    self.skipped = self.skipped.saturating_add(skipped);
566                    self.observer.on_subscriber_lagged(&self.key, skipped);
567                    if let Some(max) = self.max_lag {
568                        if self.skipped > max {
569                            self.observer.on_subscriber_evicted(&self.key);
570                            return None;
571                        }
572                    }
573                    continue;
574                }
575                Err(RecvError::Closed) => return None,
576            }
577        }
578    }
579
580    /// Borrow the underlying raw receiver, for callers that need
581    /// [`broadcast::Receiver`] APIs directly.
582    pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
583        &mut self.rx
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use crate::CodecId;
591
592    fn video(pts: i64, key: bool) -> MediaFrame {
593        MediaFrame::new_video(
594            pts,
595            pts,
596            bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65]),
597            CodecId::H264,
598            key,
599        )
600    }
601
602    /// Regression guard for the sender-lifetime fix: `close()` must terminate a
603    /// subscriber's `recv` even while a `StreamHandle` clone is still held — the
604    /// exact shape that previously hung WHEP egress forever.
605    #[tokio::test]
606    async fn close_terminates_recv_while_a_handle_clone_is_held() {
607        let handle = StreamHandle::new("live".into(), "show".into(), 16);
608        let mut sub = handle.subscribe_resilient();
609
610        // A retained clone (e.g. an egress pump) must NOT pin the channel open.
611        let retained = handle.clone();
612        handle.publish_frame(video(0, true)).unwrap();
613        assert!(sub.recv().await.is_some(), "frame delivered before close");
614
615        retained.close();
616
617        // Without the registry-owned sender this would block forever; bound it so
618        // a regression fails loudly instead of hanging the suite.
619        let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
620            .await
621            .expect("recv resolved after close (no hang)");
622        assert!(got.is_none(), "recv returns None once the stream is closed");
623
624        // Publishing post-close is a clean no-op, not a panic.
625        assert_eq!(retained.publish_frame(video(1, false)).unwrap(), 0);
626        assert_eq!(retained.subscriber_count(), 0);
627    }
628
629    /// `mono_ms` is monotonic and drives `last_frame_ms`, so the idle reaper's
630    /// elapsed-time math is independent of the wall clock.
631    #[test]
632    fn mono_clock_is_monotonic_and_drives_last_frame() {
633        let a = mono_ms();
634        let b = mono_ms();
635        assert!(b >= a, "monotonic clock never goes backward");
636
637        let handle = StreamHandle::new("live".into(), "m".into(), 4);
638        let before = mono_ms();
639        handle.publish_frame(video(0, true)).unwrap();
640        assert!(
641            handle.last_frame_ms() >= before,
642            "last_frame_ms advances on the monotonic clock"
643        );
644    }
645}