Skip to main content

oxpulse_sfu_kit/
propagate.rs

1//! Cross-client propagated events.
2//!
3//! Only events that fan out between clients live here. Outbound UDP
4//! `Transmit`s are held on the [`Client`][crate::Client] and drained by the
5//! registry — they never propagate.
6//!
7//! Ported from [`str0m/examples/chat.rs`](https://github.com/algesten/str0m/blob/0.18.0/examples/chat.rs).
8
9use std::ops::Deref;
10use std::sync::Weak;
11use std::time::Instant;
12
13use crate::bandwidth::BandwidthEstimate;
14use crate::client::TrackIn;
15use crate::ids::{SfuMid, SfuRid};
16use crate::keyframe::SfuKeyframeRequest;
17use crate::media::SfuMediaPayload;
18use crate::rtcp_stats::PeerRtcpStats;
19
20/// Monotonic per-process identifier for a connected peer.
21///
22/// Wraps a `u64` counter allocated at `Client` construction time. Implements
23/// [`Deref`] to `u64` for ergonomic comparisons with the speaker-detection
24/// API that uses bare `u64` peer IDs.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct ClientId(pub u64);
27
28impl Deref for ClientId {
29    type Target = u64;
30    fn deref(&self) -> &Self::Target {
31        &self.0
32    }
33}
34
35/// Events the registry propagates between clients.
36///
37/// `Noop` and `Timeout` are consumed inside the registry's poll loop and never
38/// reach individual clients. All other variants fan out to every non-origin peer.
39#[allow(clippy::large_enum_variant)]
40#[derive(Debug)]
41#[non_exhaustive]
42pub enum Propagated {
43    /// Nothing to do — returned by [`Client::poll_output`][crate::Client::poll_output]
44    /// when str0m produced only outbound datagrams (queued on the client).
45    Noop,
46
47    /// The client's poll returned this as its next wake-up deadline.
48    Timeout(Instant),
49
50    /// A new incoming track is open on the originating client and should be
51    /// advertised to every other client.
52    TrackOpen(ClientId, Weak<TrackIn>),
53
54    /// Media payload received by the originating client, to be forwarded to
55    /// every other client (subject to the per-subscriber simulcast layer filter).
56    MediaData(ClientId, SfuMediaPayload),
57
58    /// A keyframe request that must reach the source of the outgoing track.
59    ///
60    /// Fields: `(origin_of_request, request, source_client, source_mid)`.
61    /// The fanout dispatcher routes this only to the `source_client`.
62    KeyframeRequest(ClientId, SfuKeyframeRequest, ClientId, SfuMid),
63
64    /// A keyframe request that must be forwarded upstream to the origin SFU.
65    ///
66    /// Emitted instead of [`KeyframeRequest`][Self::KeyframeRequest] when a
67    /// subscriber requests a keyframe for a track whose publisher is a relay
68    /// client (`ClientOrigin::RelayFromSfu`). The application must relay this
69    /// request to the upstream SFU via its signalling channel -- the kit cannot
70    /// send PLI/FIR to a relay peer that has no inbound WebRTC negotiation for
71    /// that direction.
72    ///
73    /// Fields: `(source_relay_id, req, source_mid)`.
74    UpstreamKeyframeRequest {
75        /// The relay client whose upstream track needs a keyframe.
76        source_relay_id: ClientId,
77        /// The keyframe request (PLI or FIR).
78        req: SfuKeyframeRequest,
79        /// The track MID on the relay client.
80        source_mid: SfuMid,
81    },
82
83    /// Dominant-speaker election changed.
84    ///
85    /// Emitted by [`Registry::tick_active_speaker`][crate::Registry::tick_active_speaker]
86    /// when the `active-speaker` feature is enabled. The `peer_id` is the newly
87    /// dominant peer. Fanout skips the speaker themselves (skip-self rule).
88    #[cfg(feature = "active-speaker")]
89    #[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
90    ActiveSpeakerChanged {
91        /// The peer that became the dominant speaker.
92        peer_id: u64,
93        /// Medium-window log-ratio confidence margin (C2).
94        ///
95        /// `0.0` means bootstrap election (first speaker in an empty room).
96        /// Values above `2.0` indicate a confident, contested win.
97        /// Consumers may use this to delay UI updates for low-confidence switches.
98        confidence: f64,
99    },
100
101    /// Egress bandwidth estimate updated for this peer.
102    ///
103    /// Emitted from str0m's internal GoogCC each time the estimator produces a new
104    /// value (typically every 100–500 ms depending on TWCC traffic). Downstream
105    /// should consume this to drive layer selection or pacing decisions.
106    BandwidthEstimate {
107        /// The peer whose egress estimate changed.
108        peer_id: ClientId,
109        /// The new estimate.
110        estimate: BandwidthEstimate,
111    },
112
113    /// Browser-reported bandwidth budget hint from DataChannel budget message.
114    ///
115    /// Carries the peer's self-reported link capacity ceiling. The registry feeds
116    /// this into BandwidthEstimator::record_client_hint under the kalman-bwe
117    /// feature. Always compiled (same visibility as BandwidthEstimate).
118    ClientBudgetHint(
119        /// The subscriber reporting their budget.
120        ClientId,
121        /// Budget ceiling in bits per second.
122        u64,
123    ),
124
125    /// RTCP-derived stats updated for this peer.
126    ///
127    /// Derived from str0m's `Event::PeerStats` (emitted ~1 Hz). Contains
128    /// loss fraction and RTT; jitter is not available from the per-peer aggregate
129    /// event in str0m 0.18 (it requires per-mid `MediaEgressStats`) and is
130    /// always `Duration::ZERO` in this release.
131    RtcpStats {
132        /// The peer whose stats were updated.
133        peer_id: ClientId,
134        /// The updated stats snapshot.
135        stats: PeerRtcpStats,
136    },
137
138    /// Subscriber's egress BWE crossed the audio-only threshold.
139    ///
140    /// When `audio_only = true`, stop forwarding video to this peer.
141    /// When `audio_only = false`, resume. Only emitted with `pacer` feature.
142    #[cfg(feature = "pacer")]
143    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
144    AudioOnlyMode {
145        /// The subscriber peer.
146        peer_id: ClientId,
147        /// `true` = entered audio-only; `false` = video restored.
148        audio_only: bool,
149    },
150    /// Subscriber's egress BWE crossed the suspend-video threshold.
151    ///
152    /// When `suspended = true`, stop forwarding video to this peer; audio
153    /// continues to flow (audio-only is a parent state of suspended). Phase 7
154    /// wires the per-client fanout filter that performs the actual frame drop.
155    /// When `suspended = false`, lift back to the audio-only continuation;
156    /// a subsequent `RestoreVideo` will then lift to LOW.
157    /// Only emitted with `pacer` feature.
158    #[cfg(feature = "pacer")]
159    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
160    SuspendVideo {
161        /// The subscriber peer.
162        peer_id: ClientId,
163        /// `true` = entered suspended sub-state; `false` = audio-only restored.
164        suspended: bool,
165    },
166    /// Hint to the publisher that they may stop encoding layers above `max_rid`.
167    ///
168    /// Emitted by  when the maximum
169    /// desired layer across all subscribers changes. The application should relay
170    /// this to the publisher via RTCP or signalling.
171    PublisherLayerHint {
172        /// The publisher whose encoding may be reduced.
173        publisher_id: ClientId,
174        /// Highest simulcast layer any subscriber currently wants.
175        max_rid: SfuRid,
176    },
177
178    /// Hint to the application that the upstream SFU should stop encoding layers
179    /// above  for this relay publisher.
180    ///
181    /// Emitted by  when the maximum
182    /// desired layer across all subscribers of a relay-originated publisher changes.
183    /// The application must forward this via its inter-SFU signalling channel.
184    PublisherLayerHintForUpstream {
185        /// The relay client whose upstream publisher should be signalled.
186        publisher_relay_id: ClientId,
187        /// Highest simulcast layer any subscriber of this relay currently wants.
188        max_rid: SfuRid,
189    },
190
191    /// Subscriber capability hint for Opus audio codec redundancy.
192    ///
193    /// Emit to the application signalling layer to negotiate `red/48000/2` in
194    /// the publisher's SDP offer, or relay via a custom data-channel protocol.
195    /// The SFU does not inject RED — it is a sender-side concern.
196    AudioCodecHint {
197        /// The subscriber expressing the preference.
198        peer_id: ClientId,
199        /// Subscriber can decode Opus RFC 2198 RED (`red/48000/2` in SDP).
200        opus_red: bool,
201        /// Subscriber can decode Opus DRED (Deep REDundancy — libopus 0.9+).
202        opus_dred: bool,
203    },
204}
205
206impl Propagated {
207    /// Which client produced this event, if any.
208    ///
209    /// Used by the registry to skip the originator during fanout. Returns `None`
210    /// for `Noop`, `Timeout`, and `ActiveSpeakerChanged` (the latter uses its
211    /// own `peer_id == *client.id` skip rule).
212    pub fn client_id(&self) -> Option<ClientId> {
213        match self {
214            Propagated::TrackOpen(c, _)
215            | Propagated::MediaData(c, _)
216            | Propagated::KeyframeRequest(c, _, _, _) => Some(*c),
217            Propagated::Noop | Propagated::Timeout(_) => None,
218            #[cfg(feature = "active-speaker")]
219            Propagated::ActiveSpeakerChanged { .. } => None,
220            Propagated::ClientBudgetHint(c, _) => Some(*c),
221            Propagated::BandwidthEstimate { peer_id, .. }
222            | Propagated::RtcpStats { peer_id, .. } => Some(*peer_id),
223            #[cfg(feature = "pacer")]
224            Propagated::AudioOnlyMode { peer_id, .. } => Some(*peer_id),
225            #[cfg(feature = "pacer")]
226            Propagated::SuspendVideo { peer_id, .. } => Some(*peer_id),
227            Propagated::PublisherLayerHint { publisher_id, .. } => Some(*publisher_id),
228            Propagated::PublisherLayerHintForUpstream {
229                publisher_relay_id, ..
230            } => Some(*publisher_relay_id),
231            Propagated::AudioCodecHint { peer_id, .. } => Some(*peer_id),
232            Propagated::UpstreamKeyframeRequest {
233                source_relay_id, ..
234            } => Some(*source_relay_id),
235        }
236    }
237}