oxpulse_sfu_kit/propagate.rs
1//! Cross-client propagated events.
2//!
3//! Only events that fan out between clients live here. Outbound UDP
4//! `Transmit`s are held on the [`Client`][crate::Client] and drained by the
5//! registry — they never propagate.
6//!
7//! Ported from [`str0m/examples/chat.rs`](https://github.com/algesten/str0m/blob/0.18.0/examples/chat.rs).
8
9use std::ops::Deref;
10use std::sync::Weak;
11use std::time::Instant;
12
13use crate::bandwidth::BandwidthEstimate;
14use crate::client::TrackIn;
15use crate::ids::{SfuMid, SfuRid};
16use crate::keyframe::SfuKeyframeRequest;
17use crate::media::SfuMediaPayload;
18use crate::rtcp_stats::PeerRtcpStats;
19
20/// Monotonic per-process identifier for a connected peer.
21///
22/// Wraps a `u64` counter allocated at `Client` construction time. Implements
23/// [`Deref`] to `u64` for ergonomic comparisons with the speaker-detection
24/// API that uses bare `u64` peer IDs.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
26pub struct ClientId(pub u64);
27
28impl Deref for ClientId {
29 type Target = u64;
30 fn deref(&self) -> &Self::Target {
31 &self.0
32 }
33}
34
35/// Events the registry propagates between clients.
36///
37/// `Noop` and `Timeout` are consumed inside the registry's poll loop and never
38/// reach individual clients. All other variants fan out to every non-origin peer.
39#[allow(clippy::large_enum_variant)]
40#[derive(Debug)]
41#[non_exhaustive]
42pub enum Propagated {
43 /// Nothing to do — returned by [`Client::poll_output`][crate::Client::poll_output]
44 /// when str0m produced only outbound datagrams (queued on the client).
45 Noop,
46
47 /// The client's poll returned this as its next wake-up deadline.
48 Timeout(Instant),
49
50 /// A new incoming track is open on the originating client and should be
51 /// advertised to every other client.
52 TrackOpen(ClientId, Weak<TrackIn>),
53
54 /// Media payload received by the originating client, to be forwarded to
55 /// every other client (subject to the per-subscriber simulcast layer filter).
56 MediaData(ClientId, SfuMediaPayload),
57
58 /// A keyframe request that must reach the source of the outgoing track.
59 ///
60 /// Fields: `(origin_of_request, request, source_client, source_mid)`.
61 /// The fanout dispatcher routes this only to the `source_client`.
62 KeyframeRequest(ClientId, SfuKeyframeRequest, ClientId, SfuMid),
63
64 /// A keyframe request that must be forwarded upstream to the origin SFU.
65 ///
66 /// Emitted instead of [`KeyframeRequest`][Self::KeyframeRequest] when a
67 /// subscriber requests a keyframe for a track whose publisher is a relay
68 /// client (`ClientOrigin::RelayFromSfu`). The application must relay this
69 /// request to the upstream SFU via its signalling channel -- the kit cannot
70 /// send PLI/FIR to a relay peer that has no inbound WebRTC negotiation for
71 /// that direction.
72 ///
73 /// Fields: `(source_relay_id, req, source_mid)`.
74 UpstreamKeyframeRequest {
75 /// The relay client whose upstream track needs a keyframe.
76 source_relay_id: ClientId,
77 /// The keyframe request (PLI or FIR).
78 req: SfuKeyframeRequest,
79 /// The track MID on the relay client.
80 source_mid: SfuMid,
81 },
82
83 /// Dominant-speaker election changed.
84 ///
85 /// Emitted by [`Registry::tick_active_speaker`][crate::Registry::tick_active_speaker]
86 /// when the `active-speaker` feature is enabled. The `peer_id` is the newly
87 /// dominant peer. Fanout skips the speaker themselves (skip-self rule).
88 #[cfg(feature = "active-speaker")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
90 ActiveSpeakerChanged {
91 /// The peer that became the dominant speaker.
92 peer_id: u64,
93 /// Medium-window log-ratio confidence margin (C2).
94 ///
95 /// `0.0` means bootstrap election (first speaker in an empty room).
96 /// Values above `2.0` indicate a confident, contested win.
97 /// Consumers may use this to delay UI updates for low-confidence switches.
98 confidence: f64,
99 },
100
101 /// Egress bandwidth estimate updated for this peer.
102 ///
103 /// Emitted from str0m's internal GoogCC each time the estimator produces a new
104 /// value (typically every 100–500 ms depending on TWCC traffic). Downstream
105 /// should consume this to drive layer selection or pacing decisions.
106 BandwidthEstimate {
107 /// The peer whose egress estimate changed.
108 peer_id: ClientId,
109 /// The new estimate.
110 estimate: BandwidthEstimate,
111 },
112
113 /// Browser-reported bandwidth budget hint from DataChannel budget message.
114 ///
115 /// Carries the peer's self-reported link capacity ceiling. The registry feeds
116 /// this into BandwidthEstimator::record_client_hint under the kalman-bwe
117 /// feature. Always compiled (same visibility as BandwidthEstimate).
118 ClientBudgetHint(
119 /// The subscriber reporting their budget.
120 ClientId,
121 /// Budget ceiling in bits per second.
122 u64,
123 ),
124
125 /// RTCP-derived stats updated for this peer.
126 ///
127 /// Derived from str0m's `Event::PeerStats` (emitted ~1 Hz). Contains
128 /// loss fraction and RTT; jitter is not available from the per-peer aggregate
129 /// event in str0m 0.18 (it requires per-mid `MediaEgressStats`) and is
130 /// always `Duration::ZERO` in this release.
131 RtcpStats {
132 /// The peer whose stats were updated.
133 peer_id: ClientId,
134 /// The updated stats snapshot.
135 stats: PeerRtcpStats,
136 },
137
138 /// Subscriber's egress BWE crossed the audio-only threshold.
139 ///
140 /// When `audio_only = true`, stop forwarding video to this peer.
141 /// When `audio_only = false`, resume. Only emitted with `pacer` feature.
142 #[cfg(feature = "pacer")]
143 #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
144 AudioOnlyMode {
145 /// The subscriber peer.
146 peer_id: ClientId,
147 /// `true` = entered audio-only; `false` = video restored.
148 audio_only: bool,
149 },
150 /// Subscriber's egress BWE crossed the suspend-video threshold.
151 ///
152 /// When `suspended = true`, stop forwarding video to this peer; audio
153 /// continues to flow (audio-only is a parent state of suspended). Phase 7
154 /// wires the per-client fanout filter that performs the actual frame drop.
155 /// When `suspended = false`, lift back to the audio-only continuation;
156 /// a subsequent `RestoreVideo` will then lift to LOW.
157 /// Only emitted with `pacer` feature.
158 #[cfg(feature = "pacer")]
159 #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
160 SuspendVideo {
161 /// The subscriber peer.
162 peer_id: ClientId,
163 /// `true` = entered suspended sub-state; `false` = audio-only restored.
164 suspended: bool,
165 },
166 /// Hint to the publisher that they may stop encoding layers above `max_rid`.
167 ///
168 /// Emitted by when the maximum
169 /// desired layer across all subscribers changes. The application should relay
170 /// this to the publisher via RTCP or signalling.
171 PublisherLayerHint {
172 /// The publisher whose encoding may be reduced.
173 publisher_id: ClientId,
174 /// Highest simulcast layer any subscriber currently wants.
175 max_rid: SfuRid,
176 },
177
178 /// Hint to the application that the upstream SFU should stop encoding layers
179 /// above for this relay publisher.
180 ///
181 /// Emitted by when the maximum
182 /// desired layer across all subscribers of a relay-originated publisher changes.
183 /// The application must forward this via its inter-SFU signalling channel.
184 PublisherLayerHintForUpstream {
185 /// The relay client whose upstream publisher should be signalled.
186 publisher_relay_id: ClientId,
187 /// Highest simulcast layer any subscriber of this relay currently wants.
188 max_rid: SfuRid,
189 },
190
191 /// Subscriber capability hint for Opus audio codec redundancy.
192 ///
193 /// Emit to the application signalling layer to negotiate `red/48000/2` in
194 /// the publisher's SDP offer, or relay via a custom data-channel protocol.
195 /// The SFU does not inject RED — it is a sender-side concern.
196 AudioCodecHint {
197 /// The subscriber expressing the preference.
198 peer_id: ClientId,
199 /// Subscriber can decode Opus RFC 2198 RED (`red/48000/2` in SDP).
200 opus_red: bool,
201 /// Subscriber can decode Opus DRED (Deep REDundancy — libopus 0.9+).
202 opus_dred: bool,
203 },
204}
205
206impl Propagated {
207 /// Which client produced this event, if any.
208 ///
209 /// Used by the registry to skip the originator during fanout. Returns `None`
210 /// for `Noop`, `Timeout`, and `ActiveSpeakerChanged` (the latter uses its
211 /// own `peer_id == *client.id` skip rule).
212 pub fn client_id(&self) -> Option<ClientId> {
213 match self {
214 Propagated::TrackOpen(c, _)
215 | Propagated::MediaData(c, _)
216 | Propagated::KeyframeRequest(c, _, _, _) => Some(*c),
217 Propagated::Noop | Propagated::Timeout(_) => None,
218 #[cfg(feature = "active-speaker")]
219 Propagated::ActiveSpeakerChanged { .. } => None,
220 Propagated::ClientBudgetHint(c, _) => Some(*c),
221 Propagated::BandwidthEstimate { peer_id, .. }
222 | Propagated::RtcpStats { peer_id, .. } => Some(*peer_id),
223 #[cfg(feature = "pacer")]
224 Propagated::AudioOnlyMode { peer_id, .. } => Some(*peer_id),
225 #[cfg(feature = "pacer")]
226 Propagated::SuspendVideo { peer_id, .. } => Some(*peer_id),
227 Propagated::PublisherLayerHint { publisher_id, .. } => Some(*publisher_id),
228 Propagated::PublisherLayerHintForUpstream {
229 publisher_relay_id, ..
230 } => Some(*publisher_relay_id),
231 Propagated::AudioCodecHint { peer_id, .. } => Some(*peer_id),
232 Propagated::UpstreamKeyframeRequest {
233 source_relay_id, ..
234 } => Some(*source_relay_id),
235 }
236 }
237}