Skip to main content

arcly_stream/bus/
handle.rs

1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, StreamId, StreamKey};
5use arc_swap::ArcSwap;
6use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex};
9use std::time::{SystemTime, UNIX_EPOCH};
10use tokio::sync::broadcast::error::RecvError;
11use tokio::sync::{broadcast, RwLock};
12
13/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
14pub(crate) fn now_ms() -> u64 {
15    SystemTime::now()
16        .duration_since(UNIX_EPOCH)
17        .map(|d| d.as_millis() as u64)
18        .unwrap_or(0)
19}
20
21/// Current lifecycle state of a stream.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum StreamState {
24    /// No publisher yet.
25    Idle,
26    /// A publisher is connected and sending data.
27    Publishing,
28    /// The stream is being transcoded into one or more renditions.
29    Transcoding,
30    /// The stream is being recorded.
31    Recording,
32    /// The publisher has disconnected; the stream has ended.
33    Ended,
34}
35
36impl std::fmt::Display for StreamState {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.write_str(match self {
39            StreamState::Idle => "idle",
40            StreamState::Publishing => "publishing",
41            StreamState::Transcoding => "transcoding",
42            StreamState::Recording => "recording",
43            StreamState::Ended => "ended",
44        })
45    }
46}
47
48/// Runtime metadata about a stream, updated continuously while publishing.
49///
50/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
51/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
52/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
53/// by [`StreamHandle::metadata_snapshot`].
54#[derive(Debug, Clone)]
55pub struct StreamMetadata {
56    /// The `(app, stream_id)` this metadata describes.
57    pub key: StreamKey,
58    /// Publisher remote address.
59    pub publisher_addr: Option<SocketAddr>,
60    /// Video width in pixels (0 = unknown).
61    pub width: u32,
62    /// Video height in pixels (0 = unknown).
63    pub height: u32,
64    /// Video frames per second (0 = unknown). Overlaid from measured throughput.
65    pub fps: f64,
66    /// Measured video ingest bitrate in bits-per-second.
67    pub video_bitrate_bps: u64,
68    /// Measured audio ingest bitrate in bits-per-second.
69    pub audio_bitrate_bps: u64,
70    /// Timestamp of the first frame received (Unix ms).
71    pub started_at_ms: u64,
72    /// Protocol used for ingest (e.g. `"rtmp"`).
73    pub ingest_protocol: String,
74}
75
76impl StreamMetadata {
77    /// Create zeroed metadata for `(app, stream_id)`.
78    pub fn new(app: AppName, stream_id: StreamId) -> Self {
79        Self {
80            key: StreamKey::new(app, stream_id),
81            publisher_addr: None,
82            width: 0,
83            height: 0,
84            fps: 0.0,
85            video_bitrate_bps: 0,
86            audio_bitrate_bps: 0,
87            started_at_ms: 0,
88            ingest_protocol: String::new(),
89        }
90    }
91}
92
93/// A point-in-time snapshot of a stream's measured quality of service.
94#[derive(Debug, Clone, Copy, Default)]
95pub struct Qos {
96    /// Video bitrate over the last ~1s window (bits/sec).
97    pub video_bitrate_bps: u64,
98    /// Audio bitrate over the last ~1s window (bits/sec).
99    pub audio_bitrate_bps: u64,
100    /// Video frames per second over the last ~1s window.
101    pub fps: f64,
102    /// Cumulative frames published on this stream.
103    pub total_frames: u64,
104    /// Cumulative payload bytes published on this stream.
105    pub total_bytes: u64,
106}
107
108/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
109///
110/// A stream has a single publisher (enforced by `start_publish`), so the
111/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
112#[derive(Default)]
113struct QosCounters {
114    total_frames: AtomicU64,
115    total_bytes: AtomicU64,
116    window_start_ms: AtomicU64,
117    window_video_bytes: AtomicU64,
118    window_audio_bytes: AtomicU64,
119    window_video_frames: AtomicU64,
120    cur_video_bitrate: AtomicU64,
121    cur_audio_bitrate: AtomicU64,
122    cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
123    last_frame_ms: AtomicU64,
124}
125
126/// Keyframe-anchored replay buffer for instant playback start.
127struct GopBuffer {
128    /// Frames since (and including) the most recent keyframe.
129    frames: Vec<Arc<MediaFrame>>,
130    /// Hard cap on buffered frames (memory bound between keyframes).
131    capacity: usize,
132}
133
134/// A live handle to a single active stream.
135///
136/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
137/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
138/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
139///
140/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
141/// 4096 slots ≈ 32 KB per stream.
142#[derive(Clone)]
143pub struct StreamHandle {
144    metadata: Arc<RwLock<StreamMetadata>>,
145    state: Arc<RwLock<StreamState>>,
146    key: StreamKey,
147    tx: broadcast::Sender<Arc<MediaFrame>>,
148    /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
149    /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
150    video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
151    /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
152    audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
153    /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
154    gop: Arc<StdMutex<GopBuffer>>,
155    gop_capacity: usize,
156    /// Live throughput counters.
157    qos: Arc<QosCounters>,
158    /// Injected telemetry hook (no-op by default).
159    observer: Arc<dyn Observer>,
160}
161
162impl StreamHandle {
163    /// Create a handle with the no-op observer and no GOP cache.
164    pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
165        Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
166    }
167
168    /// Create a handle wired to a host-supplied observer.
169    ///
170    /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
171    pub fn with_observer(
172        app: AppName,
173        stream_id: StreamId,
174        capacity: usize,
175        gop_capacity: usize,
176        observer: Arc<dyn Observer>,
177    ) -> Self {
178        let (tx, _) = broadcast::channel(capacity);
179        let qos = QosCounters::default();
180        // Treat creation as the last activity so a just-claimed stream is not
181        // instantly considered idle before its first frame arrives.
182        qos.last_frame_ms.store(now_ms(), Ordering::Relaxed);
183        Self {
184            metadata: Arc::new(RwLock::new(StreamMetadata::new(
185                app.clone(),
186                stream_id.clone(),
187            ))),
188            state: Arc::new(RwLock::new(StreamState::Idle)),
189            key: StreamKey::new(app, stream_id),
190            tx,
191            video_config: Arc::new(ArcSwap::new(Arc::new(None))),
192            audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
193            gop: Arc::new(StdMutex::new(GopBuffer {
194                frames: Vec::new(),
195                capacity: gop_capacity,
196            })),
197            gop_capacity,
198            qos: Arc::new(qos),
199            observer,
200        }
201    }
202
203    /// The `(app, stream_id)` this handle belongs to.
204    pub fn key(&self) -> &StreamKey {
205        &self.key
206    }
207
208    /// Publish a frame to all current subscribers.  Returns the number of
209    /// active receivers; returns `Ok(0)` when there are no subscribers.
210    pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
211        self.observer.on_frame(&self.key, &frame);
212        let len = frame.data.len() as u64;
213        let is_audio = frame.is_audio();
214        let is_key = frame.is_keyframe();
215        let is_config = frame.flags.contains(FrameFlags::CONFIG);
216        let arc = Arc::new(frame);
217
218        // Cache the latest CONFIG frame for late-joining subscribers.
219        if is_config {
220            if is_audio {
221                self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
222            } else {
223                self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
224            }
225        }
226
227        // Maintain the keyframe-anchored GOP replay buffer.
228        if self.gop_capacity > 0 {
229            if let Ok(mut g) = self.gop.lock() {
230                if is_key {
231                    g.frames.clear();
232                    g.frames.push(Arc::clone(&arc));
233                } else if !is_config && !g.frames.is_empty() && g.frames.len() < g.capacity {
234                    // Only buffer once a keyframe anchors the GOP; CONFIG frames
235                    // are replayed separately via `cached_configs`.
236                    g.frames.push(Arc::clone(&arc));
237                }
238            }
239        }
240
241        self.record_qos(len, is_audio, is_key);
242
243        let count = self.tx.send(arc).unwrap_or(0);
244        Ok(count)
245    }
246
247    /// Fold one frame into the rolling throughput window.
248    fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
249        let q = &self.qos;
250        let now = now_ms();
251        q.total_frames.fetch_add(1, Ordering::Relaxed);
252        q.total_bytes.fetch_add(len, Ordering::Relaxed);
253        q.last_frame_ms.store(now, Ordering::Relaxed);
254        if is_audio {
255            q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
256        } else {
257            q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
258            q.window_video_frames.fetch_add(1, Ordering::Relaxed);
259        }
260
261        let ws = q.window_start_ms.load(Ordering::Relaxed);
262        if ws == 0 {
263            q.window_start_ms.store(now, Ordering::Relaxed);
264        } else if now.saturating_sub(ws) >= 1000 {
265            let elapsed = (now - ws) as f64 / 1000.0;
266            let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
267            let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
268            let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
269            q.cur_video_bitrate
270                .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
271            q.cur_audio_bitrate
272                .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
273            q.cur_fps_milli.store(
274                (vframes as f64 / elapsed * 1000.0) as u64,
275                Ordering::Relaxed,
276            );
277            q.window_start_ms.store(now, Ordering::Relaxed);
278        }
279    }
280
281    /// A snapshot of measured throughput (bitrate, fps, totals).
282    pub fn qos(&self) -> Qos {
283        let q = &self.qos;
284        Qos {
285            video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
286            audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
287            fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
288            total_frames: q.total_frames.load(Ordering::Relaxed),
289            total_bytes: q.total_bytes.load(Ordering::Relaxed),
290        }
291    }
292
293    /// Unix-ms timestamp of the most recently published frame (or stream
294    /// creation if none yet). Used by the engine's idle reaper.
295    pub fn last_frame_ms(&self) -> u64 {
296        self.qos.last_frame_ms.load(Ordering::Relaxed)
297    }
298
299    /// Returns the most recently seen video and audio CONFIG frames,
300    /// for replaying to late-joining subscribers.
301    pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
302        let video = (**self.video_config.load()).clone();
303        let audio = (**self.audio_config.load()).clone();
304        (video, audio)
305    }
306
307    /// The frames a late joiner should be handed before going live: cached
308    /// decoder configs followed by the current GOP (keyframe + trailing deltas).
309    ///
310    /// Replaying these lets a new subscriber start decoding immediately rather
311    /// than waiting for the next keyframe — sub-second join times at scale.
312    /// Requires the app to have enabled a GOP cache; otherwise only the cached
313    /// configs are returned.
314    pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
315        let (vcfg, acfg) = self.cached_configs();
316        let mut out = Vec::new();
317        out.extend(vcfg);
318        out.extend(acfg);
319        if self.gop_capacity > 0 {
320            if let Ok(g) = self.gop.lock() {
321                for f in &g.frames {
322                    // Avoid duplicating a config frame already pushed above.
323                    if !out.iter().any(|c| Arc::ptr_eq(c, f)) {
324                        out.push(Arc::clone(f));
325                    }
326                }
327            }
328        }
329        out
330    }
331
332    /// Subscribe to this stream's frame bus.
333    ///
334    /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
335    /// when a slow consumer falls behind the channel capacity — callers that
336    /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
337    /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
338    /// are deliberately handling lag yourself.
339    pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
340        self.tx.subscribe()
341    }
342
343    /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
344    /// of terminating, reporting each gap to the installed [`Observer`] via
345    /// [`Observer::on_subscriber_lagged`].
346    pub fn subscribe_resilient(&self) -> Subscription {
347        Subscription {
348            rx: self.tx.subscribe(),
349            key: self.key.clone(),
350            observer: Arc::clone(&self.observer),
351            max_lag: None,
352            skipped: 0,
353        }
354    }
355
356    /// Number of active subscribers.
357    pub fn subscriber_count(&self) -> usize {
358        self.tx.receiver_count()
359    }
360
361    /// Transition to a new state.
362    pub async fn set_state(&self, state: StreamState) {
363        let mut guard = self.state.write().await;
364        *guard = state;
365    }
366
367    /// The current lifecycle state.
368    pub async fn current_state(&self) -> StreamState {
369        self.state.read().await.clone()
370    }
371
372    /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
373    /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
374    ///
375    /// Cloning the snapshot releases the lock immediately, so callers never hold
376    /// the metadata `RwLock` across an `.await`.
377    pub async fn metadata_snapshot(&self) -> StreamMetadata {
378        let mut m = self.metadata.read().await.clone();
379        let q = self.qos();
380        m.video_bitrate_bps = q.video_bitrate_bps;
381        m.audio_bitrate_bps = q.audio_bitrate_bps;
382        if q.fps > 0.0 {
383            m.fps = q.fps;
384        }
385        m
386    }
387
388    /// Mutate this stream's [`StreamMetadata`] under the write lock.
389    ///
390    /// Ingest handlers call this as they parse the stream — e.g. on the first
391    /// keyframe to record resolution from the codec config, or to set the
392    /// publisher address — so the metadata exposed to operators and the control
393    /// plane stays live rather than frozen at its zeroed defaults.
394    ///
395    /// ```no_run
396    /// # use arcly_stream::StreamHandle;
397    /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
398    /// handle
399    ///     .update_metadata(|m| {
400    ///         m.publisher_addr = Some(addr);
401    ///         m.width = 1920;
402    ///         m.height = 1080;
403    ///         m.ingest_protocol = "rtmp".to_string();
404    ///     })
405    ///     .await;
406    /// # }
407    /// ```
408    pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
409        let mut guard = self.metadata.write().await;
410        f(&mut guard);
411    }
412}
413
414impl std::fmt::Debug for StreamHandle {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        f.debug_struct("StreamHandle")
417            .field("key", &self.key)
418            .field("subscribers", &self.subscriber_count())
419            .finish()
420    }
421}
422
423/// A lag-tolerant subscription to a stream's frame bus.
424///
425/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
426/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
427/// consumer falls behind: the dropped span is reported to the [`Observer`] as
428/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
429/// continues from the oldest still-buffered frame. This is the recommended
430/// consumer loop for packagers, recorders, and SFUs.
431///
432/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
433/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
434/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
435/// notification) so a hopelessly slow consumer is shed rather than wasting
436/// buffer churn forever.
437pub struct Subscription {
438    rx: broadcast::Receiver<Arc<MediaFrame>>,
439    key: StreamKey,
440    observer: Arc<dyn Observer>,
441    max_lag: Option<u64>,
442    skipped: u64,
443}
444
445impl Subscription {
446    /// Evict this subscriber once cumulative dropped frames exceed `max`.
447    ///
448    /// ```no_run
449    /// # use arcly_stream::StreamHandle;
450    /// # fn demo(handle: &StreamHandle) {
451    /// let sub = handle.subscribe_resilient().max_lag(10_000);
452    /// # let _ = sub;
453    /// # }
454    /// ```
455    pub fn max_lag(mut self, max: u64) -> Self {
456        self.max_lag = Some(max);
457        self
458    }
459
460    /// Total frames dropped from this subscriber's view so far.
461    pub fn dropped(&self) -> u64 {
462        self.skipped
463    }
464
465    /// Receive the next frame, resynchronizing past any lag.
466    ///
467    /// Returns `None` when the stream's sender is dropped (the publisher ended)
468    /// or when the `max_lag` eviction threshold is crossed:
469    ///
470    /// ```no_run
471    /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
472    /// while let Some(frame) = sub.recv().await {
473    ///     // packetize `frame` …
474    /// }
475    /// # }
476    /// ```
477    pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
478        loop {
479            match self.rx.recv().await {
480                Ok(frame) => return Some(frame),
481                Err(RecvError::Lagged(skipped)) => {
482                    self.skipped = self.skipped.saturating_add(skipped);
483                    self.observer.on_subscriber_lagged(&self.key, skipped);
484                    if let Some(max) = self.max_lag {
485                        if self.skipped > max {
486                            self.observer.on_subscriber_evicted(&self.key);
487                            return None;
488                        }
489                    }
490                    continue;
491                }
492                Err(RecvError::Closed) => return None,
493            }
494        }
495    }
496
497    /// Borrow the underlying raw receiver, for callers that need
498    /// [`broadcast::Receiver`] APIs directly.
499    pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
500        &mut self.rx
501    }
502}