Skip to main content

arcly_stream/bus/
handle.rs

1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, Result, StreamId, StreamKey};
5use arc_swap::{ArcSwap, ArcSwapOption};
6use std::net::SocketAddr;
7use std::ops::ControlFlow;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, Mutex as StdMutex, OnceLock};
10use std::time::{Instant, SystemTime, UNIX_EPOCH};
11use tokio::sync::broadcast::error::RecvError;
12use tokio::sync::{broadcast, RwLock};
13use tokio_util::sync::CancellationToken;
14
15/// A per-frame egress sink driven by [`StreamHandle::drive_to`].
16///
17/// Implementors send one frame to a single downstream peer (an RTSP player, an
18/// upstream RTMP server, …). Returning [`ControlFlow::Break`] stops the drive
19/// cleanly — typically because the peer disconnected — while `Err` aborts it.
20#[async_trait::async_trait]
21pub trait FrameSink: Send {
22    /// Forward one frame; `Break` ends the session gracefully.
23    async fn send(&mut self, frame: Arc<MediaFrame>) -> Result<ControlFlow<()>>;
24}
25
26/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
27///
28/// Use this only for *displayed* timestamps (e.g. `started_at_ms`); it is
29/// subject to NTP steps and operator clock changes, so it must never drive
30/// elapsed-time decisions. For those, use [`mono_ms`].
31pub(crate) fn now_ms() -> u64 {
32    SystemTime::now()
33        .duration_since(UNIX_EPOCH)
34        .map(|d| d.as_millis() as u64)
35        .unwrap_or(0)
36}
37
38/// Milliseconds elapsed on a process-local **monotonic** clock since the first
39/// call. Unlike [`now_ms`], this never jumps backward (or forward) when the wall
40/// clock is adjusted, so it is the correct basis for QoS windows and the idle
41/// reaper: a leap-second or NTP correction can't spuriously reap a live stream
42/// or distort a measured bitrate.
43pub(crate) fn mono_ms() -> u64 {
44    static EPOCH: OnceLock<Instant> = OnceLock::new();
45    EPOCH.get_or_init(Instant::now).elapsed().as_millis() as u64
46}
47
48/// Current lifecycle state of a stream.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum StreamState {
51    /// No publisher yet.
52    Idle,
53    /// A publisher is connected and sending data.
54    Publishing,
55    /// The stream is being transcoded into one or more renditions.
56    Transcoding,
57    /// The stream is being recorded.
58    Recording,
59    /// The publisher has disconnected; the stream has ended.
60    Ended,
61}
62
63impl std::fmt::Display for StreamState {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.write_str(match self {
66            StreamState::Idle => "idle",
67            StreamState::Publishing => "publishing",
68            StreamState::Transcoding => "transcoding",
69            StreamState::Recording => "recording",
70            StreamState::Ended => "ended",
71        })
72    }
73}
74
75/// Runtime metadata about a stream, updated continuously while publishing.
76///
77/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
78/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
79/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
80/// by [`StreamHandle::metadata_snapshot`].
81#[derive(Debug, Clone)]
82pub struct StreamMetadata {
83    /// The `(app, stream_id)` this metadata describes.
84    pub key: StreamKey,
85    /// Publisher remote address.
86    pub publisher_addr: Option<SocketAddr>,
87    /// Video width in pixels (0 = unknown).
88    pub width: u32,
89    /// Video height in pixels (0 = unknown).
90    pub height: u32,
91    /// Video frames per second (0 = unknown). Overlaid from measured throughput.
92    pub fps: f64,
93    /// Measured video ingest bitrate in bits-per-second.
94    pub video_bitrate_bps: u64,
95    /// Measured audio ingest bitrate in bits-per-second.
96    pub audio_bitrate_bps: u64,
97    /// Timestamp of the first frame received (Unix ms).
98    pub started_at_ms: u64,
99    /// Protocol used for ingest (e.g. `"rtmp"`).
100    pub ingest_protocol: String,
101}
102
103impl StreamMetadata {
104    /// Create zeroed metadata for `(app, stream_id)`.
105    pub fn new(app: AppName, stream_id: StreamId) -> Self {
106        Self {
107            key: StreamKey::new(app, stream_id),
108            publisher_addr: None,
109            width: 0,
110            height: 0,
111            fps: 0.0,
112            video_bitrate_bps: 0,
113            audio_bitrate_bps: 0,
114            started_at_ms: 0,
115            ingest_protocol: String::new(),
116        }
117    }
118}
119
120/// A point-in-time snapshot of a stream's measured quality of service.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct Qos {
123    /// Video bitrate over the last ~1s window (bits/sec).
124    pub video_bitrate_bps: u64,
125    /// Audio bitrate over the last ~1s window (bits/sec).
126    pub audio_bitrate_bps: u64,
127    /// Video frames per second over the last ~1s window.
128    pub fps: f64,
129    /// Cumulative frames published on this stream.
130    pub total_frames: u64,
131    /// Cumulative payload bytes published on this stream.
132    pub total_bytes: u64,
133}
134
135/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
136///
137/// A stream has a single publisher (enforced by `start_publish`), so the
138/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
139#[derive(Default)]
140struct QosCounters {
141    total_frames: AtomicU64,
142    total_bytes: AtomicU64,
143    window_start_ms: AtomicU64,
144    window_video_bytes: AtomicU64,
145    window_audio_bytes: AtomicU64,
146    window_video_frames: AtomicU64,
147    cur_video_bitrate: AtomicU64,
148    cur_audio_bitrate: AtomicU64,
149    cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
150    last_frame_ms: AtomicU64,
151}
152
153/// Keyframe-anchored replay buffer for instant playback start.
154struct GopBuffer {
155    /// Frames since (and including) the most recent keyframe.
156    frames: Vec<Arc<MediaFrame>>,
157    /// Hard cap on buffered frames (memory bound between keyframes).
158    capacity: usize,
159    /// Whether the current GOP already overflowed `capacity` (so the truncation
160    /// signal fires once per GOP, not once per dropped frame).
161    overflowed: bool,
162}
163
164/// A live handle to a single active stream.
165///
166/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
167/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
168/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
169///
170/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
171/// 4096 slots ≈ 32 KB per stream.
172///
173/// # Backpressure model
174///
175/// Fan-out uses a **single, fixed-capacity ring buffer per stream** (a
176/// `tokio::broadcast` channel sized by `AppSpec::broadcast_capacity`). This is a
177/// deliberate design choice, with consequences worth understanding:
178///
179/// - **The publisher never blocks on a slow subscriber.** Publishing is a
180///   non-awaiting pointer write; one subscriber falling behind can never apply
181///   backpressure to the publisher or to its peers. This is what keeps the hot
182///   path lock-free and the fast publisher isolated from the slow viewer.
183/// - **Backpressure is resolved by dropping, not stalling.** A subscriber that
184///   can't keep up overruns the ring and observes lag.
185///   [`Subscription::recv`] resynchronizes to the oldest still-buffered frame
186///   and reports the gap via [`Observer::on_subscriber_lagged`]; with
187///   [`Subscription::max_lag`] a chronically slow consumer is evicted
188///   ([`Observer::on_subscriber_evicted`]) rather than churning forever.
189/// - **Capacity is the tuning knob**, traded per stream: larger capacity
190///   tolerates burstier consumers at higher per-stream memory, smaller capacity
191///   sheds laggards sooner. There is intentionally no per-subscriber queue —
192///   that would reintroduce unbounded memory growth and per-consumer locking,
193///   the very things this design avoids.
194///
195/// In short: a slow subscriber degrades only *its own* view (lag, then
196/// eviction), never the publisher's or another subscriber's. Wire an
197/// [`Observer`] to see lag and eviction as they happen.
198#[derive(Clone)]
199pub struct StreamHandle {
200    metadata: Arc<RwLock<StreamMetadata>>,
201    state: Arc<RwLock<StreamState>>,
202    key: StreamKey,
203    /// The frame-bus sender, held *indirectly* so its lifetime is owned by the
204    /// stream's lifecycle — not by how many `StreamHandle` clones happen to be
205    /// alive. Cloning a handle shares this cell; it does not mint a new sender.
206    ///
207    /// This is the structural fix for a sharp edge: previously the handle stored
208    /// the `broadcast::Sender` by value, so any consumer that merely retained a
209    /// handle (to subscribe, read metadata, request a keyframe) silently pinned
210    /// the channel open and defeated the `Closed` shutdown signal. Now
211    /// [`close`](Self::close) — called by the registry when a publish ends — empties
212    /// this cell, dropping the sole sender and closing the channel regardless of
213    /// any lingering handle clones.
214    tx: Arc<ArcSwapOption<broadcast::Sender<Arc<MediaFrame>>>>,
215    /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
216    /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
217    video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
218    /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
219    audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
220    /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
221    gop: Arc<StdMutex<GopBuffer>>,
222    gop_capacity: usize,
223    /// Live throughput counters.
224    qos: Arc<QosCounters>,
225    /// Injected telemetry hook (no-op by default).
226    observer: Arc<dyn Observer>,
227}
228
229impl StreamHandle {
230    /// Create a handle with the no-op observer and no GOP cache.
231    pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
232        Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
233    }
234
235    /// Create a handle wired to a host-supplied observer.
236    ///
237    /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
238    pub fn with_observer(
239        app: AppName,
240        stream_id: StreamId,
241        capacity: usize,
242        gop_capacity: usize,
243        observer: Arc<dyn Observer>,
244    ) -> Self {
245        let (tx, _) = broadcast::channel(capacity);
246        let qos = QosCounters::default();
247        // Treat creation as the last activity so a just-claimed stream is not
248        // instantly considered idle before its first frame arrives. Monotonic, to
249        // match the idle reaper (see `mono_ms`).
250        qos.last_frame_ms.store(mono_ms(), Ordering::Relaxed);
251        Self {
252            metadata: Arc::new(RwLock::new(StreamMetadata::new(
253                app.clone(),
254                stream_id.clone(),
255            ))),
256            state: Arc::new(RwLock::new(StreamState::Idle)),
257            key: StreamKey::new(app, stream_id),
258            tx: Arc::new(ArcSwapOption::new(Some(Arc::new(tx)))),
259            video_config: Arc::new(ArcSwap::new(Arc::new(None))),
260            audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
261            gop: Arc::new(StdMutex::new(GopBuffer {
262                frames: Vec::new(),
263                capacity: gop_capacity,
264                overflowed: false,
265            })),
266            gop_capacity,
267            qos: Arc::new(qos),
268            observer,
269        }
270    }
271
272    /// The `(app, stream_id)` this handle belongs to.
273    pub fn key(&self) -> &StreamKey {
274        &self.key
275    }
276
277    /// Publish a frame to all current subscribers.  Returns the number of
278    /// active receivers; returns `Ok(0)` when there are no subscribers.
279    pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
280        self.observer.on_frame(&self.key, &frame);
281        let len = frame.data.len() as u64;
282        let is_audio = frame.is_audio();
283        let is_key = frame.is_keyframe();
284        let is_config = frame.flags.contains(FrameFlags::CONFIG);
285        let arc = Arc::new(frame);
286
287        // Cache the latest CONFIG frame for late-joining subscribers.
288        if is_config {
289            if is_audio {
290                self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
291            } else {
292                self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
293            }
294        }
295
296        // Maintain the keyframe-anchored GOP replay buffer.
297        let mut gop_truncated = false;
298        if self.gop_capacity > 0 {
299            if let Ok(mut g) = self.gop.lock() {
300                if is_key {
301                    g.frames.clear();
302                    g.overflowed = false;
303                    g.frames.push(Arc::clone(&arc));
304                } else if !is_config && !g.frames.is_empty() {
305                    // Only buffer once a keyframe anchors the GOP; CONFIG frames
306                    // are replayed separately via `cached_configs`.
307                    if g.frames.len() < g.capacity {
308                        g.frames.push(Arc::clone(&arc));
309                    } else if !g.overflowed {
310                        // First overflow of this GOP: the replay will be truncated.
311                        // Latch it so the signal fires once, not per dropped frame.
312                        g.overflowed = true;
313                        gop_truncated = true;
314                    }
315                }
316            }
317        }
318        if gop_truncated {
319            self.observer.on_gop_truncated(&self.key, self.gop_capacity);
320        }
321
322        self.record_qos(len, is_audio, is_key);
323
324        // The sender is gone once the stream is closed; publishing then is a
325        // no-op (Ok(0)) rather than an error, so a publisher racing teardown
326        // winds down cleanly.
327        let count = match self.tx.load_full() {
328            Some(tx) => tx.send(arc).unwrap_or(0),
329            None => 0,
330        };
331        Ok(count)
332    }
333
334    /// Fold one frame into the rolling throughput window.
335    fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
336        let q = &self.qos;
337        let now = mono_ms();
338        q.total_frames.fetch_add(1, Ordering::Relaxed);
339        q.total_bytes.fetch_add(len, Ordering::Relaxed);
340        q.last_frame_ms.store(now, Ordering::Relaxed);
341        if is_audio {
342            q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
343        } else {
344            q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
345            q.window_video_frames.fetch_add(1, Ordering::Relaxed);
346        }
347
348        let ws = q.window_start_ms.load(Ordering::Relaxed);
349        if ws == 0 {
350            q.window_start_ms.store(now, Ordering::Relaxed);
351        } else if now.saturating_sub(ws) >= 1000 {
352            let elapsed = (now - ws) as f64 / 1000.0;
353            let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
354            let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
355            let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
356            q.cur_video_bitrate
357                .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
358            q.cur_audio_bitrate
359                .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
360            q.cur_fps_milli.store(
361                (vframes as f64 / elapsed * 1000.0) as u64,
362                Ordering::Relaxed,
363            );
364            q.window_start_ms.store(now, Ordering::Relaxed);
365        }
366    }
367
368    /// A snapshot of measured throughput (bitrate, fps, totals).
369    pub fn qos(&self) -> Qos {
370        let q = &self.qos;
371        Qos {
372            video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
373            audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
374            fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
375            total_frames: q.total_frames.load(Ordering::Relaxed),
376            total_bytes: q.total_bytes.load(Ordering::Relaxed),
377        }
378    }
379
380    /// Monotonic-clock timestamp (process-local milliseconds) of the most
381    /// recently published frame (or stream creation if none yet). Used by the
382    /// engine's idle reaper; this is elapsed monotonic time, not wall-clock time,
383    /// so compare it only against other readings of the same monotonic clock.
384    pub fn last_frame_ms(&self) -> u64 {
385        self.qos.last_frame_ms.load(Ordering::Relaxed)
386    }
387
388    /// Returns the most recently seen video and audio CONFIG frames,
389    /// for replaying to late-joining subscribers.
390    pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
391        let video = (**self.video_config.load()).clone();
392        let audio = (**self.audio_config.load()).clone();
393        (video, audio)
394    }
395
396    /// The frames a late joiner should be handed before going live: cached
397    /// decoder configs followed by the current GOP (keyframe + trailing deltas).
398    ///
399    /// Replaying these lets a new subscriber start decoding immediately rather
400    /// than waiting for the next keyframe — sub-second join times at scale.
401    /// Requires the app to have enabled a GOP cache; otherwise only the cached
402    /// configs are returned.
403    pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
404        let (vcfg, acfg) = self.cached_configs();
405        let mut out = Vec::new();
406        out.extend(vcfg.clone());
407        out.extend(acfg.clone());
408        if self.gop_capacity > 0 {
409            if let Ok(g) = self.gop.lock() {
410                out.reserve(g.frames.len());
411                for f in &g.frames {
412                    // Avoid duplicating a config frame already pushed above. Only
413                    // the (≤2) cached configs can collide with a GOP frame — GOP
414                    // frames are themselves distinct — so compare against just
415                    // those, keeping this O(n) rather than O(n²) over the GOP.
416                    let dup = vcfg.as_ref().is_some_and(|c| Arc::ptr_eq(c, f))
417                        || acfg.as_ref().is_some_and(|c| Arc::ptr_eq(c, f));
418                    if !dup {
419                        out.push(Arc::clone(f));
420                    }
421                }
422            }
423        }
424        out
425    }
426
427    /// Subscribe to this stream's frame bus.
428    ///
429    /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
430    /// when a slow consumer falls behind the channel capacity — callers that
431    /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
432    /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
433    /// are deliberately handling lag yourself.
434    pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
435        match self.tx.load_full() {
436            Some(tx) => tx.subscribe(),
437            // The stream has already closed: hand back a receiver on a spent
438            // channel so the caller's `recv` loop terminates immediately with
439            // `Closed` rather than blocking forever.
440            None => {
441                let (_, rx) = broadcast::channel(1);
442                rx
443            }
444        }
445    }
446
447    /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
448    /// of terminating, reporting each gap to the installed [`Observer`] via
449    /// [`Observer::on_subscriber_lagged`].
450    pub fn subscribe_resilient(&self) -> Subscription {
451        Subscription {
452            rx: self.subscribe(),
453            key: self.key.clone(),
454            observer: Arc::clone(&self.observer),
455            max_lag: None,
456            skipped: 0,
457        }
458    }
459
460    /// Drive this stream to a [`FrameSink`]: replay the instant-start buffer
461    /// (cached configs + cached GOP) so the consumer can decode immediately,
462    /// then forward live frames until the stream ends or `shutdown` fires.
463    ///
464    /// This is the playback loop every egress that pulls from a single stream
465    /// shares (RTSP server, RTMP relay, …); each supplies only its per-frame
466    /// [`FrameSink::send`].
467    pub async fn drive_to(
468        &self,
469        shutdown: &CancellationToken,
470        sink: &mut dyn FrameSink,
471    ) -> Result<()> {
472        for frame in self.replay_buffer() {
473            if sink.send(frame).await?.is_break() {
474                return Ok(());
475            }
476        }
477        let mut sub = self.subscribe_resilient();
478        loop {
479            tokio::select! {
480                _ = shutdown.cancelled() => break,
481                next = sub.recv() => match next {
482                    Some(frame) => {
483                        if sink.send(frame).await?.is_break() {
484                            break;
485                        }
486                    }
487                    None => break, // stream ended
488                }
489            }
490        }
491        Ok(())
492    }
493
494    /// Drive this stream into a [`Packager`](crate::packager::Packager) (HLS
495    /// segmenter, DASH packager, …): replay the instant-start buffer — **which
496    /// carries the cached CONFIG access unit (SPS/PPS)** — then forward live
497    /// frames until the stream ends or `shutdown` fires, and `finish`.
498    ///
499    /// Priming with [`replay_buffer`](Self::replay_buffer) is essential, not just
500    /// an optimization: the CONFIG frame is published once at stream start, so a
501    /// packager that only `subscribe_resilient`s (subscribing *after* the event
502    /// that spawned it) never sees it — and then a fragmented-MP4 muxer never
503    /// builds its `init.m4s` and the master playlist never learns its codec
504    /// string. Replaying configs first makes both deterministic.
505    #[cfg(feature = "hls")]
506    pub async fn package_to(
507        &self,
508        shutdown: &CancellationToken,
509        packager: &mut dyn crate::packager::Packager,
510    ) -> Result<()> {
511        for frame in self.replay_buffer() {
512            packager.push(&frame).await?;
513        }
514        let mut sub = self.subscribe_resilient();
515        loop {
516            tokio::select! {
517                _ = shutdown.cancelled() => break,
518                next = sub.recv() => match next {
519                    Some(frame) => packager.push(&frame).await?,
520                    None => break, // stream ended
521                }
522            }
523        }
524        packager.finish().await
525    }
526
527    /// Number of active subscribers (0 once the stream is closed).
528    pub fn subscriber_count(&self) -> usize {
529        self.tx
530            .load_full()
531            .map(|tx| tx.receiver_count())
532            .unwrap_or(0)
533    }
534
535    /// Close the frame bus: drop the sole sender so every subscriber's `recv`
536    /// observes `Closed` and terminates, *regardless* of how many `StreamHandle`
537    /// clones are still alive.
538    ///
539    /// Called by the registry when a publish ends (see
540    /// `Application::end_publish`). Idempotent. This is what makes the channel's
541    /// lifetime track the stream's lifecycle rather than handle reachability.
542    pub fn close(&self) {
543        self.tx.store(None);
544    }
545
546    /// Transition to a new state.
547    pub async fn set_state(&self, state: StreamState) {
548        let mut guard = self.state.write().await;
549        *guard = state;
550    }
551
552    /// The current lifecycle state.
553    pub async fn current_state(&self) -> StreamState {
554        self.state.read().await.clone()
555    }
556
557    /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
558    /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
559    ///
560    /// Cloning the snapshot releases the lock immediately, so callers never hold
561    /// the metadata `RwLock` across an `.await`.
562    pub async fn metadata_snapshot(&self) -> StreamMetadata {
563        let mut m = self.metadata.read().await.clone();
564        let q = self.qos();
565        m.video_bitrate_bps = q.video_bitrate_bps;
566        m.audio_bitrate_bps = q.audio_bitrate_bps;
567        if q.fps > 0.0 {
568            m.fps = q.fps;
569        }
570        m
571    }
572
573    /// Mutate this stream's [`StreamMetadata`] under the write lock.
574    ///
575    /// Ingest handlers call this as they parse the stream — e.g. on the first
576    /// keyframe to record resolution from the codec config, or to set the
577    /// publisher address — so the metadata exposed to operators and the control
578    /// plane stays live rather than frozen at its zeroed defaults.
579    ///
580    /// ```no_run
581    /// # use arcly_stream::StreamHandle;
582    /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
583    /// handle
584    ///     .update_metadata(|m| {
585    ///         m.publisher_addr = Some(addr);
586    ///         m.width = 1920;
587    ///         m.height = 1080;
588    ///         m.ingest_protocol = "rtmp".to_string();
589    ///     })
590    ///     .await;
591    /// # }
592    /// ```
593    pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
594        let mut guard = self.metadata.write().await;
595        f(&mut guard);
596    }
597}
598
599impl std::fmt::Debug for StreamHandle {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        f.debug_struct("StreamHandle")
602            .field("key", &self.key)
603            .field("subscribers", &self.subscriber_count())
604            .finish()
605    }
606}
607
608/// A lag-tolerant subscription to a stream's frame bus.
609///
610/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
611/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
612/// consumer falls behind: the dropped span is reported to the [`Observer`] as
613/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
614/// continues from the oldest still-buffered frame. This is the recommended
615/// consumer loop for packagers, recorders, and SFUs.
616///
617/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
618/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
619/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
620/// notification) so a hopelessly slow consumer is shed rather than wasting
621/// buffer churn forever.
622pub struct Subscription {
623    rx: broadcast::Receiver<Arc<MediaFrame>>,
624    key: StreamKey,
625    observer: Arc<dyn Observer>,
626    max_lag: Option<u64>,
627    skipped: u64,
628}
629
630impl Subscription {
631    /// Evict this subscriber once cumulative dropped frames exceed `max`.
632    ///
633    /// ```no_run
634    /// # use arcly_stream::StreamHandle;
635    /// # fn demo(handle: &StreamHandle) {
636    /// let sub = handle.subscribe_resilient().max_lag(10_000);
637    /// # let _ = sub;
638    /// # }
639    /// ```
640    pub fn max_lag(mut self, max: u64) -> Self {
641        self.max_lag = Some(max);
642        self
643    }
644
645    /// Total frames dropped from this subscriber's view so far.
646    pub fn dropped(&self) -> u64 {
647        self.skipped
648    }
649
650    /// Receive the next frame, resynchronizing past any lag.
651    ///
652    /// Returns `None` when the stream's sender is dropped (the publisher ended)
653    /// or when the `max_lag` eviction threshold is crossed:
654    ///
655    /// ```no_run
656    /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
657    /// while let Some(frame) = sub.recv().await {
658    ///     // packetize `frame` …
659    /// }
660    /// # }
661    /// ```
662    pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
663        loop {
664            match self.rx.recv().await {
665                Ok(frame) => return Some(frame),
666                Err(RecvError::Lagged(skipped)) => {
667                    self.skipped = self.skipped.saturating_add(skipped);
668                    self.observer.on_subscriber_lagged(&self.key, skipped);
669                    if let Some(max) = self.max_lag {
670                        if self.skipped > max {
671                            self.observer.on_subscriber_evicted(&self.key);
672                            return None;
673                        }
674                    }
675                    continue;
676                }
677                Err(RecvError::Closed) => return None,
678            }
679        }
680    }
681
682    /// Borrow the underlying raw receiver, for callers that need
683    /// [`broadcast::Receiver`] APIs directly.
684    pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
685        &mut self.rx
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use crate::CodecId;
693
694    fn video(pts: i64, key: bool) -> MediaFrame {
695        MediaFrame::new_video(
696            pts,
697            pts,
698            bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65]),
699            CodecId::H264,
700            key,
701        )
702    }
703
704    /// Regression guard for the sender-lifetime fix: `close()` must terminate a
705    /// subscriber's `recv` even while a `StreamHandle` clone is still held — the
706    /// exact shape that previously hung WHEP egress forever.
707    #[tokio::test]
708    async fn close_terminates_recv_while_a_handle_clone_is_held() {
709        let handle = StreamHandle::new("live".into(), "show".into(), 16);
710        let mut sub = handle.subscribe_resilient();
711
712        // A retained clone (e.g. an egress pump) must NOT pin the channel open.
713        let retained = handle.clone();
714        handle.publish_frame(video(0, true)).unwrap();
715        assert!(sub.recv().await.is_some(), "frame delivered before close");
716
717        retained.close();
718
719        // Without the registry-owned sender this would block forever; bound it so
720        // a regression fails loudly instead of hanging the suite.
721        let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
722            .await
723            .expect("recv resolved after close (no hang)");
724        assert!(got.is_none(), "recv returns None once the stream is closed");
725
726        // Publishing post-close is a clean no-op, not a panic.
727        assert_eq!(retained.publish_frame(video(1, false)).unwrap(), 0);
728        assert_eq!(retained.subscriber_count(), 0);
729    }
730
731    /// `package_to` must replay the cached CONFIG access unit even though it was
732    /// published *before* the packager subscribed — the regression behind DASH's
733    /// missing `init.m4s` and the absent master playlist.
734    #[cfg(feature = "hls")]
735    #[tokio::test]
736    async fn package_to_replays_config_published_before_subscribe() {
737        use crate::packager::Packager;
738        use tokio_util::sync::CancellationToken;
739
740        // A trivial packager that records the flags of every frame it is pushed.
741        struct RecordingPackager(std::sync::Arc<std::sync::Mutex<Vec<FrameFlags>>>);
742        #[async_trait::async_trait]
743        impl Packager for RecordingPackager {
744            async fn push(&mut self, frame: &MediaFrame) -> crate::Result<()> {
745                self.0.lock().unwrap().push(frame.flags);
746                Ok(())
747            }
748            async fn finish(&mut self) -> crate::Result<()> {
749                Ok(())
750            }
751        }
752
753        let handle = StreamHandle::new("live".into(), "cam".into(), 16);
754
755        // The one-shot CONFIG frame arrives BEFORE any packager subscribes.
756        let mut cfg = video(0, true);
757        cfg.flags |= FrameFlags::CONFIG;
758        handle.publish_frame(cfg).unwrap();
759
760        let seen = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
761        let mut pkg = RecordingPackager(seen.clone());
762        let h = handle.clone();
763        let shutdown = CancellationToken::new();
764        let task = tokio::spawn(async move { h.package_to(&shutdown, &mut pkg).await });
765
766        // Give the replay + subscribe a moment, then end the stream.
767        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
768        handle.publish_frame(video(1, false)).unwrap();
769        handle.close();
770        task.await.unwrap().unwrap();
771
772        assert!(
773            seen.lock()
774                .unwrap()
775                .iter()
776                .any(|f| f.contains(FrameFlags::CONFIG)),
777            "package_to must replay the cached CONFIG frame to the packager"
778        );
779    }
780
781    /// The GOP-overflow signal fires once per GOP (not per dropped frame) and
782    /// resets on the next keyframe so a fresh GOP can report again.
783    #[test]
784    fn gop_truncation_signals_once_per_gop() {
785        use crate::StreamKey;
786        use std::sync::atomic::{AtomicUsize, Ordering};
787
788        #[derive(Default)]
789        struct CountingObserver {
790            truncations: AtomicUsize,
791        }
792        impl Observer for CountingObserver {
793            fn on_gop_truncated(&self, _key: &StreamKey, _capacity: usize) {
794                self.truncations.fetch_add(1, Ordering::Relaxed);
795            }
796        }
797
798        let obs = Arc::new(CountingObserver::default());
799        // Capacity 2: keyframe + 1 delta fit; the 2nd delta onward overflows.
800        let handle = StreamHandle::with_observer(
801            "live".into(),
802            "g".into(),
803            64,
804            2,
805            Arc::clone(&obs) as Arc<dyn Observer>,
806        );
807
808        handle.publish_frame(video(0, true)).unwrap(); // keyframe anchors GOP
809        handle.publish_frame(video(1, false)).unwrap(); // fills to capacity
810        handle.publish_frame(video(2, false)).unwrap(); // first overflow → signal
811        handle.publish_frame(video(3, false)).unwrap(); // still overflowing → silent
812        assert_eq!(
813            obs.truncations.load(Ordering::Relaxed),
814            1,
815            "one signal per GOP"
816        );
817
818        // A new keyframe resets the latch; overflowing again signals once more.
819        handle.publish_frame(video(4, true)).unwrap();
820        handle.publish_frame(video(5, false)).unwrap();
821        handle.publish_frame(video(6, false)).unwrap();
822        handle.publish_frame(video(7, false)).unwrap();
823        assert_eq!(
824            obs.truncations.load(Ordering::Relaxed),
825            2,
826            "next GOP signals again"
827        );
828    }
829
830    /// `mono_ms` is monotonic and drives `last_frame_ms`, so the idle reaper's
831    /// elapsed-time math is independent of the wall clock.
832    #[test]
833    fn mono_clock_is_monotonic_and_drives_last_frame() {
834        let a = mono_ms();
835        let b = mono_ms();
836        assert!(b >= a, "monotonic clock never goes backward");
837
838        let handle = StreamHandle::new("live".into(), "m".into(), 4);
839        let before = mono_ms();
840        handle.publish_frame(video(0, true)).unwrap();
841        assert!(
842            handle.last_frame_ms() >= before,
843            "last_frame_ms advances on the monotonic clock"
844        );
845    }
846}