pub struct StreamHandle { /* private fields */ }Expand description
A live handle to a single active stream.
Multiple subscribers (HLS packager, DASH packager, WebRTC SFU, recorders …)
call StreamHandle::subscribe_resilient to receive every MediaFrame
cheaply via a broadcast channel (zero-copy Bytes cloning).
Each broadcast slot holds one Arc<MediaFrame> pointer (8 bytes), so e.g.
4096 slots ≈ 32 KB per stream.
§Backpressure model
Fan-out uses a single, fixed-capacity ring buffer per stream (a
tokio::broadcast channel sized by AppSpec::broadcast_capacity). This is a
deliberate design choice, with consequences worth understanding:
- The publisher never blocks on a slow subscriber. Publishing is a non-awaiting pointer write; one subscriber falling behind can never apply backpressure to the publisher or to its peers. This is what keeps the hot path lock-free and the fast publisher isolated from the slow viewer.
- Backpressure is resolved by dropping, not stalling. A subscriber that
can’t keep up overruns the ring and observes lag.
Subscription::recvresynchronizes to the oldest still-buffered frame and reports the gap viaObserver::on_subscriber_lagged; withSubscription::max_laga chronically slow consumer is evicted (Observer::on_subscriber_evicted) rather than churning forever. - Capacity is the tuning knob, traded per stream: larger capacity tolerates burstier consumers at higher per-stream memory, smaller capacity sheds laggards sooner. There is intentionally no per-subscriber queue — that would reintroduce unbounded memory growth and per-consumer locking, the very things this design avoids.
In short: a slow subscriber degrades only its own view (lag, then
eviction), never the publisher’s or another subscriber’s. Wire an
Observer to see lag and eviction as they happen.
Implementations§
Source§impl StreamHandle
impl StreamHandle
Sourcepub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self
pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self
Create a handle with the no-op observer and no GOP cache.
Sourcepub fn with_observer(
app: AppName,
stream_id: StreamId,
capacity: usize,
gop_capacity: usize,
observer: Arc<dyn Observer>,
) -> Self
pub fn with_observer( app: AppName, stream_id: StreamId, capacity: usize, gop_capacity: usize, observer: Arc<dyn Observer>, ) -> Self
Create a handle wired to a host-supplied observer.
gop_capacity bounds the keyframe-anchored replay buffer (0 disables it).
Sourcepub fn publish_frame(&self, frame: MediaFrame) -> Result<usize>
pub fn publish_frame(&self, frame: MediaFrame) -> Result<usize>
Publish a frame to all current subscribers. Returns the number of
active receivers; returns Ok(0) when there are no subscribers.
Sourcepub fn last_frame_ms(&self) -> u64
pub fn last_frame_ms(&self) -> u64
Monotonic-clock timestamp (process-local milliseconds) of the most recently published frame (or stream creation if none yet). Used by the engine’s idle reaper; this is elapsed monotonic time, not wall-clock time, so compare it only against other readings of the same monotonic clock.
Sourcepub fn cached_configs(
&self,
) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>)
pub fn cached_configs( &self, ) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>)
Returns the most recently seen video and audio CONFIG frames, for replaying to late-joining subscribers.
Sourcepub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>>
pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>>
The frames a late joiner should be handed before going live: cached decoder configs followed by the current GOP (keyframe + trailing deltas).
Replaying these lets a new subscriber start decoding immediately rather than waiting for the next keyframe — sub-second join times at scale. Requires the app to have enabled a GOP cache; otherwise only the cached configs are returned.
Sourcepub fn subscribe(&self) -> Receiver<Arc<MediaFrame>>
pub fn subscribe(&self) -> Receiver<Arc<MediaFrame>>
Subscribe to this stream’s frame bus.
The returned raw broadcast::Receiver surfaces RecvError::Lagged
when a slow consumer falls behind the channel capacity — callers that
while let Ok(_) = rx.recv().await will silently terminate on the first
lag. Prefer subscribe_resilient unless you
are deliberately handling lag yourself.
Sourcepub fn subscribe_resilient(&self) -> Subscription
pub fn subscribe_resilient(&self) -> Subscription
Subscribe with a Subscription that resynchronizes after lag instead
of terminating, reporting each gap to the installed Observer via
Observer::on_subscriber_lagged.
Sourcepub fn subscriber_count(&self) -> usize
pub fn subscriber_count(&self) -> usize
Number of active subscribers (0 once the stream is closed).
Sourcepub fn close(&self)
pub fn close(&self)
Close the frame bus: drop the sole sender so every subscriber’s recv
observes Closed and terminates, regardless of how many StreamHandle
clones are still alive.
Called by the registry when a publish ends (see
Application::end_publish). Idempotent. This is what makes the channel’s
lifetime track the stream’s lifecycle rather than handle reachability.
Sourcepub async fn set_state(&self, state: StreamState)
pub async fn set_state(&self, state: StreamState)
Transition to a new state.
Sourcepub async fn current_state(&self) -> StreamState
pub async fn current_state(&self) -> StreamState
The current lifecycle state.
Sourcepub async fn metadata_snapshot(&self) -> StreamMetadata
pub async fn metadata_snapshot(&self) -> StreamMetadata
A consistent point-in-time copy of this stream’s StreamMetadata, with
the live measured fps/bitrate overlaid from qos.
Cloning the snapshot releases the lock immediately, so callers never hold
the metadata RwLock across an .await.
Sourcepub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata))
pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata))
Mutate this stream’s StreamMetadata under the write lock.
Ingest handlers call this as they parse the stream — e.g. on the first keyframe to record resolution from the codec config, or to set the publisher address — so the metadata exposed to operators and the control plane stays live rather than frozen at its zeroed defaults.
handle
.update_metadata(|m| {
m.publisher_addr = Some(addr);
m.width = 1920;
m.height = 1080;
m.ingest_protocol = "rtmp".to_string();
})
.await;Trait Implementations§
Source§impl Clone for StreamHandle
impl Clone for StreamHandle
Source§fn clone(&self) -> StreamHandle
fn clone(&self) -> StreamHandle
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more