Skip to main content

StreamHandle

Struct StreamHandle 

Source
pub struct StreamHandle { /* private fields */ }
Expand description

A live handle to a single active stream.

Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …) call StreamHandle::subscribe_resilient to receive every MediaFrame cheaply via a broadcast channel (zero-copy Bytes cloning).

Each broadcast slot holds one Arc<MediaFrame> pointer (8 bytes), so e.g. 4096 slots ≈ 32 KB per stream.

§Backpressure model

Fan-out uses a single, fixed-capacity ring buffer per stream (a tokio::broadcast channel sized by AppSpec::broadcast_capacity). This is a deliberate design choice, with consequences worth understanding:

  • The publisher never blocks on a slow subscriber. Publishing is a non-awaiting pointer write; one subscriber falling behind can never apply backpressure to the publisher or to its peers. This is what keeps the hot path lock-free and the fast publisher isolated from the slow viewer.
  • Backpressure is resolved by dropping, not stalling. A subscriber that can’t keep up overruns the ring and observes lag. Subscription::recv resynchronizes to the oldest still-buffered frame and reports the gap via Observer::on_subscriber_lagged; with Subscription::max_lag a chronically slow consumer is evicted (Observer::on_subscriber_evicted) rather than churning forever.
  • Capacity is the tuning knob, traded per stream: larger capacity tolerates burstier consumers at higher per-stream memory, smaller capacity sheds laggards sooner. There is intentionally no per-subscriber queue — that would reintroduce unbounded memory growth and per-consumer locking, the very things this design avoids.

In short: a slow subscriber degrades only its own view (lag, then eviction), never the publisher’s or another subscriber’s. Wire an Observer to see lag and eviction as they happen.

Implementations§

Source§

impl StreamHandle

Source

pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self

Create a handle with the no-op observer and no GOP cache.

Source

pub fn with_observer( app: AppName, stream_id: StreamId, capacity: usize, gop_capacity: usize, observer: Arc<dyn Observer>, ) -> Self

Create a handle wired to a host-supplied observer.

gop_capacity bounds the keyframe-anchored replay buffer (0 disables it).

Source

pub fn key(&self) -> &StreamKey

The (app, stream_id) this handle belongs to.

Source

pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize>

Publish a frame to all current subscribers. Returns the number of active receivers; returns Ok(0) when there are no subscribers.

Source

pub fn qos(&self) -> Qos

A snapshot of measured throughput (bitrate, fps, totals).

Source

pub fn last_frame_ms(&self) -> u64

Monotonic-clock timestamp (process-local milliseconds) of the most recently published frame (or stream creation if none yet). Used by the engine’s idle reaper; this is elapsed monotonic time, not wall-clock time, so compare it only against other readings of the same monotonic clock.

Source

pub fn cached_configs( &self, ) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>)

Returns the most recently seen video and audio CONFIG frames, for replaying to late-joining subscribers.

Source

pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>>

The frames a late joiner should be handed before going live: cached decoder configs followed by the current GOP (keyframe + trailing deltas).

Replaying these lets a new subscriber start decoding immediately rather than waiting for the next keyframe — sub-second join times at scale. Requires the app to have enabled a GOP cache; otherwise only the cached configs are returned.

Source

pub fn subscribe(&self) -> Receiver<Arc<MediaFrame>>

Subscribe to this stream’s frame bus.

The returned raw broadcast::Receiver surfaces RecvError::Lagged when a slow consumer falls behind the channel capacity — callers that while let Ok(_) = rx.recv().await will silently terminate on the first lag. Prefer subscribe_resilient unless you are deliberately handling lag yourself.

Source

pub fn subscribe_resilient(&self) -> Subscription

Subscribe with a Subscription that resynchronizes after lag instead of terminating, reporting each gap to the installed Observer via Observer::on_subscriber_lagged.

Source

pub fn subscriber_count(&self) -> usize

Number of active subscribers (0 once the stream is closed).

Source

pub fn close(&self)

Close the frame bus: drop the sole sender so every subscriber’s recv observes Closed and terminates, regardless of how many StreamHandle clones are still alive.

Called by the registry when a publish ends (see Application::end_publish). Idempotent. This is what makes the channel’s lifetime track the stream’s lifecycle rather than handle reachability.

Source

pub async fn set_state(&self, state: StreamState)

Transition to a new state.

Source

pub async fn current_state(&self) -> StreamState

The current lifecycle state.

Source

pub async fn metadata_snapshot(&self) -> StreamMetadata

A consistent point-in-time copy of this stream’s StreamMetadata, with the live measured fps/bitrate overlaid from qos.

Cloning the snapshot releases the lock immediately, so callers never hold the metadata RwLock across an .await.

Source

pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata))

Mutate this stream’s StreamMetadata under the write lock.

Ingest handlers call this as they parse the stream — e.g. on the first keyframe to record resolution from the codec config, or to set the publisher address — so the metadata exposed to operators and the control plane stays live rather than frozen at its zeroed defaults.

handle
    .update_metadata(|m| {
        m.publisher_addr = Some(addr);
        m.width = 1920;
        m.height = 1080;
        m.ingest_protocol = "rtmp".to_string();
    })
    .await;

Trait Implementations§

Source§

impl Clone for StreamHandle

Source§

fn clone(&self) -> StreamHandle

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for StreamHandle

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more