arcly_stream/bus/handle.rs
1//! The live stream handle — a lock-free, zero-copy broadcast fan-out bus.
2
3use crate::observe::{NoopObserver, Observer};
4use crate::{frame::FrameFlags, AppName, MediaFrame, StreamId, StreamKey};
5use arc_swap::ArcSwap;
6use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::{Arc, Mutex as StdMutex};
9use std::time::{SystemTime, UNIX_EPOCH};
10use tokio::sync::broadcast::error::RecvError;
11use tokio::sync::{broadcast, RwLock};
12
13/// Current wall-clock time in Unix milliseconds (saturating to 0 pre-epoch).
14pub(crate) fn now_ms() -> u64 {
15 SystemTime::now()
16 .duration_since(UNIX_EPOCH)
17 .map(|d| d.as_millis() as u64)
18 .unwrap_or(0)
19}
20
21/// Current lifecycle state of a stream.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum StreamState {
24 /// No publisher yet.
25 Idle,
26 /// A publisher is connected and sending data.
27 Publishing,
28 /// The stream is being transcoded into one or more renditions.
29 Transcoding,
30 /// The stream is being recorded.
31 Recording,
32 /// The publisher has disconnected; the stream has ended.
33 Ended,
34}
35
36impl std::fmt::Display for StreamState {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.write_str(match self {
39 StreamState::Idle => "idle",
40 StreamState::Publishing => "publishing",
41 StreamState::Transcoding => "transcoding",
42 StreamState::Recording => "recording",
43 StreamState::Ended => "ended",
44 })
45 }
46}
47
48/// Runtime metadata about a stream, updated continuously while publishing.
49///
50/// Resolution and the ingest protocol are set by the protocol handler (e.g. via
51/// [`StreamHandle::update_metadata`] after parsing the codec config), while the
52/// `fps` and `*_bitrate_bps` fields are overlaid live from measured throughput
53/// by [`StreamHandle::metadata_snapshot`].
54#[derive(Debug, Clone)]
55pub struct StreamMetadata {
56 /// The `(app, stream_id)` this metadata describes.
57 pub key: StreamKey,
58 /// Publisher remote address.
59 pub publisher_addr: Option<SocketAddr>,
60 /// Video width in pixels (0 = unknown).
61 pub width: u32,
62 /// Video height in pixels (0 = unknown).
63 pub height: u32,
64 /// Video frames per second (0 = unknown). Overlaid from measured throughput.
65 pub fps: f64,
66 /// Measured video ingest bitrate in bits-per-second.
67 pub video_bitrate_bps: u64,
68 /// Measured audio ingest bitrate in bits-per-second.
69 pub audio_bitrate_bps: u64,
70 /// Timestamp of the first frame received (Unix ms).
71 pub started_at_ms: u64,
72 /// Protocol used for ingest (e.g. `"rtmp"`).
73 pub ingest_protocol: String,
74}
75
76impl StreamMetadata {
77 /// Create zeroed metadata for `(app, stream_id)`.
78 pub fn new(app: AppName, stream_id: StreamId) -> Self {
79 Self {
80 key: StreamKey::new(app, stream_id),
81 publisher_addr: None,
82 width: 0,
83 height: 0,
84 fps: 0.0,
85 video_bitrate_bps: 0,
86 audio_bitrate_bps: 0,
87 started_at_ms: 0,
88 ingest_protocol: String::new(),
89 }
90 }
91}
92
93/// A point-in-time snapshot of a stream's measured quality of service.
94#[derive(Debug, Clone, Copy, Default)]
95pub struct Qos {
96 /// Video bitrate over the last ~1s window (bits/sec).
97 pub video_bitrate_bps: u64,
98 /// Audio bitrate over the last ~1s window (bits/sec).
99 pub audio_bitrate_bps: u64,
100 /// Video frames per second over the last ~1s window.
101 pub fps: f64,
102 /// Cumulative frames published on this stream.
103 pub total_frames: u64,
104 /// Cumulative payload bytes published on this stream.
105 pub total_bytes: u64,
106}
107
108/// Lock-free throughput counters folded into [`Qos`] / [`StreamMetadata`].
109///
110/// A stream has a single publisher (enforced by `start_publish`), so the
111/// read-modify-write here is effectively single-writer and `Relaxed` is sound.
112#[derive(Default)]
113struct QosCounters {
114 total_frames: AtomicU64,
115 total_bytes: AtomicU64,
116 window_start_ms: AtomicU64,
117 window_video_bytes: AtomicU64,
118 window_audio_bytes: AtomicU64,
119 window_video_frames: AtomicU64,
120 cur_video_bitrate: AtomicU64,
121 cur_audio_bitrate: AtomicU64,
122 cur_fps_milli: AtomicU64, // fps × 1000, integer-encoded
123 last_frame_ms: AtomicU64,
124}
125
126/// Keyframe-anchored replay buffer for instant playback start.
127struct GopBuffer {
128 /// Frames since (and including) the most recent keyframe.
129 frames: Vec<Arc<MediaFrame>>,
130 /// Hard cap on buffered frames (memory bound between keyframes).
131 capacity: usize,
132}
133
134/// A live handle to a single active stream.
135///
136/// Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
137/// call [`StreamHandle::subscribe_resilient`] to receive every [`MediaFrame`]
138/// cheaply via a `broadcast` channel (zero-copy `Bytes` cloning).
139///
140/// Each broadcast slot holds one `Arc<MediaFrame>` pointer (8 bytes), so e.g.
141/// 4096 slots ≈ 32 KB per stream.
142#[derive(Clone)]
143pub struct StreamHandle {
144 metadata: Arc<RwLock<StreamMetadata>>,
145 state: Arc<RwLock<StreamState>>,
146 key: StreamKey,
147 tx: broadcast::Sender<Arc<MediaFrame>>,
148 /// Latest video CONFIG (AVCDecoderConfigurationRecord) frame, if seen.
149 /// Uses `ArcSwap` for lock-free reads from multiple subscriber tasks.
150 video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
151 /// Latest audio CONFIG (AudioSpecificConfig) frame, if seen.
152 audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
153 /// Rolling GOP buffer for instant-start (empty when `gop_capacity == 0`).
154 gop: Arc<StdMutex<GopBuffer>>,
155 gop_capacity: usize,
156 /// Live throughput counters.
157 qos: Arc<QosCounters>,
158 /// Injected telemetry hook (no-op by default).
159 observer: Arc<dyn Observer>,
160}
161
162impl StreamHandle {
163 /// Create a handle with the no-op observer and no GOP cache.
164 pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
165 Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
166 }
167
168 /// Create a handle wired to a host-supplied observer.
169 ///
170 /// `gop_capacity` bounds the keyframe-anchored replay buffer (0 disables it).
171 pub fn with_observer(
172 app: AppName,
173 stream_id: StreamId,
174 capacity: usize,
175 gop_capacity: usize,
176 observer: Arc<dyn Observer>,
177 ) -> Self {
178 let (tx, _) = broadcast::channel(capacity);
179 let qos = QosCounters::default();
180 // Treat creation as the last activity so a just-claimed stream is not
181 // instantly considered idle before its first frame arrives.
182 qos.last_frame_ms.store(now_ms(), Ordering::Relaxed);
183 Self {
184 metadata: Arc::new(RwLock::new(StreamMetadata::new(
185 app.clone(),
186 stream_id.clone(),
187 ))),
188 state: Arc::new(RwLock::new(StreamState::Idle)),
189 key: StreamKey::new(app, stream_id),
190 tx,
191 video_config: Arc::new(ArcSwap::new(Arc::new(None))),
192 audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
193 gop: Arc::new(StdMutex::new(GopBuffer {
194 frames: Vec::new(),
195 capacity: gop_capacity,
196 })),
197 gop_capacity,
198 qos: Arc::new(qos),
199 observer,
200 }
201 }
202
203 /// The `(app, stream_id)` this handle belongs to.
204 pub fn key(&self) -> &StreamKey {
205 &self.key
206 }
207
208 /// Publish a frame to all current subscribers. Returns the number of
209 /// active receivers; returns `Ok(0)` when there are no subscribers.
210 pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
211 self.observer.on_frame(&self.key, &frame);
212 let len = frame.data.len() as u64;
213 let is_audio = frame.is_audio();
214 let is_key = frame.is_keyframe();
215 let is_config = frame.flags.contains(FrameFlags::CONFIG);
216 let arc = Arc::new(frame);
217
218 // Cache the latest CONFIG frame for late-joining subscribers.
219 if is_config {
220 if is_audio {
221 self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
222 } else {
223 self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
224 }
225 }
226
227 // Maintain the keyframe-anchored GOP replay buffer.
228 if self.gop_capacity > 0 {
229 if let Ok(mut g) = self.gop.lock() {
230 if is_key {
231 g.frames.clear();
232 g.frames.push(Arc::clone(&arc));
233 } else if !is_config && !g.frames.is_empty() && g.frames.len() < g.capacity {
234 // Only buffer once a keyframe anchors the GOP; CONFIG frames
235 // are replayed separately via `cached_configs`.
236 g.frames.push(Arc::clone(&arc));
237 }
238 }
239 }
240
241 self.record_qos(len, is_audio, is_key);
242
243 let count = self.tx.send(arc).unwrap_or(0);
244 Ok(count)
245 }
246
247 /// Fold one frame into the rolling throughput window.
248 fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
249 let q = &self.qos;
250 let now = now_ms();
251 q.total_frames.fetch_add(1, Ordering::Relaxed);
252 q.total_bytes.fetch_add(len, Ordering::Relaxed);
253 q.last_frame_ms.store(now, Ordering::Relaxed);
254 if is_audio {
255 q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
256 } else {
257 q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
258 q.window_video_frames.fetch_add(1, Ordering::Relaxed);
259 }
260
261 let ws = q.window_start_ms.load(Ordering::Relaxed);
262 if ws == 0 {
263 q.window_start_ms.store(now, Ordering::Relaxed);
264 } else if now.saturating_sub(ws) >= 1000 {
265 let elapsed = (now - ws) as f64 / 1000.0;
266 let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
267 let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
268 let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
269 q.cur_video_bitrate
270 .store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
271 q.cur_audio_bitrate
272 .store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
273 q.cur_fps_milli.store(
274 (vframes as f64 / elapsed * 1000.0) as u64,
275 Ordering::Relaxed,
276 );
277 q.window_start_ms.store(now, Ordering::Relaxed);
278 }
279 }
280
281 /// A snapshot of measured throughput (bitrate, fps, totals).
282 pub fn qos(&self) -> Qos {
283 let q = &self.qos;
284 Qos {
285 video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
286 audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
287 fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
288 total_frames: q.total_frames.load(Ordering::Relaxed),
289 total_bytes: q.total_bytes.load(Ordering::Relaxed),
290 }
291 }
292
293 /// Unix-ms timestamp of the most recently published frame (or stream
294 /// creation if none yet). Used by the engine's idle reaper.
295 pub fn last_frame_ms(&self) -> u64 {
296 self.qos.last_frame_ms.load(Ordering::Relaxed)
297 }
298
299 /// Returns the most recently seen video and audio CONFIG frames,
300 /// for replaying to late-joining subscribers.
301 pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
302 let video = (**self.video_config.load()).clone();
303 let audio = (**self.audio_config.load()).clone();
304 (video, audio)
305 }
306
307 /// The frames a late joiner should be handed before going live: cached
308 /// decoder configs followed by the current GOP (keyframe + trailing deltas).
309 ///
310 /// Replaying these lets a new subscriber start decoding immediately rather
311 /// than waiting for the next keyframe — sub-second join times at scale.
312 /// Requires the app to have enabled a GOP cache; otherwise only the cached
313 /// configs are returned.
314 pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
315 let (vcfg, acfg) = self.cached_configs();
316 let mut out = Vec::new();
317 out.extend(vcfg);
318 out.extend(acfg);
319 if self.gop_capacity > 0 {
320 if let Ok(g) = self.gop.lock() {
321 for f in &g.frames {
322 // Avoid duplicating a config frame already pushed above.
323 if !out.iter().any(|c| Arc::ptr_eq(c, f)) {
324 out.push(Arc::clone(f));
325 }
326 }
327 }
328 }
329 out
330 }
331
332 /// Subscribe to this stream's frame bus.
333 ///
334 /// The returned raw [`broadcast::Receiver`] surfaces [`RecvError::Lagged`]
335 /// when a slow consumer falls behind the channel capacity — callers that
336 /// `while let Ok(_) = rx.recv().await` will silently terminate on the first
337 /// lag. Prefer [`subscribe_resilient`](Self::subscribe_resilient) unless you
338 /// are deliberately handling lag yourself.
339 pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
340 self.tx.subscribe()
341 }
342
343 /// Subscribe with a [`Subscription`] that resynchronizes after lag instead
344 /// of terminating, reporting each gap to the installed [`Observer`] via
345 /// [`Observer::on_subscriber_lagged`].
346 pub fn subscribe_resilient(&self) -> Subscription {
347 Subscription {
348 rx: self.tx.subscribe(),
349 key: self.key.clone(),
350 observer: Arc::clone(&self.observer),
351 max_lag: None,
352 skipped: 0,
353 }
354 }
355
356 /// Number of active subscribers.
357 pub fn subscriber_count(&self) -> usize {
358 self.tx.receiver_count()
359 }
360
361 /// Transition to a new state.
362 pub async fn set_state(&self, state: StreamState) {
363 let mut guard = self.state.write().await;
364 *guard = state;
365 }
366
367 /// The current lifecycle state.
368 pub async fn current_state(&self) -> StreamState {
369 self.state.read().await.clone()
370 }
371
372 /// A consistent point-in-time copy of this stream's [`StreamMetadata`], with
373 /// the live measured `fps`/bitrate overlaid from [`qos`](Self::qos).
374 ///
375 /// Cloning the snapshot releases the lock immediately, so callers never hold
376 /// the metadata `RwLock` across an `.await`.
377 pub async fn metadata_snapshot(&self) -> StreamMetadata {
378 let mut m = self.metadata.read().await.clone();
379 let q = self.qos();
380 m.video_bitrate_bps = q.video_bitrate_bps;
381 m.audio_bitrate_bps = q.audio_bitrate_bps;
382 if q.fps > 0.0 {
383 m.fps = q.fps;
384 }
385 m
386 }
387
388 /// Mutate this stream's [`StreamMetadata`] under the write lock.
389 ///
390 /// Ingest handlers call this as they parse the stream — e.g. on the first
391 /// keyframe to record resolution from the codec config, or to set the
392 /// publisher address — so the metadata exposed to operators and the control
393 /// plane stays live rather than frozen at its zeroed defaults.
394 ///
395 /// ```no_run
396 /// # use arcly_stream::StreamHandle;
397 /// # async fn demo(handle: &StreamHandle, addr: std::net::SocketAddr) {
398 /// handle
399 /// .update_metadata(|m| {
400 /// m.publisher_addr = Some(addr);
401 /// m.width = 1920;
402 /// m.height = 1080;
403 /// m.ingest_protocol = "rtmp".to_string();
404 /// })
405 /// .await;
406 /// # }
407 /// ```
408 pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
409 let mut guard = self.metadata.write().await;
410 f(&mut guard);
411 }
412}
413
414impl std::fmt::Debug for StreamHandle {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 f.debug_struct("StreamHandle")
417 .field("key", &self.key)
418 .field("subscribers", &self.subscriber_count())
419 .finish()
420 }
421}
422
423/// A lag-tolerant subscription to a stream's frame bus.
424///
425/// Returned by [`StreamHandle::subscribe_resilient`]. Unlike a raw
426/// [`broadcast::Receiver`], [`recv`](Self::recv) does not terminate when the
427/// consumer falls behind: the dropped span is reported to the [`Observer`] as
428/// [`on_subscriber_lagged`](Observer::on_subscriber_lagged) and reception
429/// continues from the oldest still-buffered frame. This is the recommended
430/// consumer loop for packagers, recorders, and SFUs.
431///
432/// An optional [`max_lag`](Self::max_lag) bound turns chronic lag into
433/// eviction: once cumulative dropped frames exceed the bound, `recv` returns
434/// `None` (after an [`on_subscriber_evicted`](Observer::on_subscriber_evicted)
435/// notification) so a hopelessly slow consumer is shed rather than wasting
436/// buffer churn forever.
437pub struct Subscription {
438 rx: broadcast::Receiver<Arc<MediaFrame>>,
439 key: StreamKey,
440 observer: Arc<dyn Observer>,
441 max_lag: Option<u64>,
442 skipped: u64,
443}
444
445impl Subscription {
446 /// Evict this subscriber once cumulative dropped frames exceed `max`.
447 ///
448 /// ```no_run
449 /// # use arcly_stream::StreamHandle;
450 /// # fn demo(handle: &StreamHandle) {
451 /// let sub = handle.subscribe_resilient().max_lag(10_000);
452 /// # let _ = sub;
453 /// # }
454 /// ```
455 pub fn max_lag(mut self, max: u64) -> Self {
456 self.max_lag = Some(max);
457 self
458 }
459
460 /// Total frames dropped from this subscriber's view so far.
461 pub fn dropped(&self) -> u64 {
462 self.skipped
463 }
464
465 /// Receive the next frame, resynchronizing past any lag.
466 ///
467 /// Returns `None` when the stream's sender is dropped (the publisher ended)
468 /// or when the `max_lag` eviction threshold is crossed:
469 ///
470 /// ```no_run
471 /// # async fn run(sub: &mut arcly_stream::bus::Subscription) {
472 /// while let Some(frame) = sub.recv().await {
473 /// // packetize `frame` …
474 /// }
475 /// # }
476 /// ```
477 pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
478 loop {
479 match self.rx.recv().await {
480 Ok(frame) => return Some(frame),
481 Err(RecvError::Lagged(skipped)) => {
482 self.skipped = self.skipped.saturating_add(skipped);
483 self.observer.on_subscriber_lagged(&self.key, skipped);
484 if let Some(max) = self.max_lag {
485 if self.skipped > max {
486 self.observer.on_subscriber_evicted(&self.key);
487 return None;
488 }
489 }
490 continue;
491 }
492 Err(RecvError::Closed) => return None,
493 }
494 }
495 }
496
497 /// Borrow the underlying raw receiver, for callers that need
498 /// [`broadcast::Receiver`] APIs directly.
499 pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
500 &mut self.rx
501 }
502}