1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
//! Public read/write accessors on [`Client`].
//!
//! Separated from the core poll/event loop in `mod.rs` to keep that file
//! focused on str0m I/O driving.
use std::sync::atomic::Ordering;
use super::Client;
use crate::dc::ChannelConfig;
use crate::ids::SfuRid;
use crate::net::IncomingDatagram;
impl Client {
/// Pre-registered DataChannels to open during SDP negotiation.
///
/// The application signalling layer (e.g. `partner-edge`) reads this slice
/// and calls `Rtc::open_stream` for each entry. Populated via
/// [`with_extra_dc`][Client::with_extra_dc], [`with_chat_dcs`][Client::with_chat_dcs],
/// and [`with_voice_dc`][Client::with_voice_dc].
#[must_use]
pub fn extra_dcs(&self) -> &[ChannelConfig] {
&self.extra_dcs
}
/// This subscriber's current desired simulcast layer.
#[must_use]
pub fn desired_layer(&self) -> SfuRid {
self.desired_layer
}
/// Override this subscriber's desired simulcast layer.
///
/// Takes effect on the next forwarded packet; no SDP renegotiation required.
pub fn set_desired_layer(&mut self, rid: SfuRid) {
self.desired_layer = rid;
// Invalidate the cached chosen layer so keyframe requests target the
// correct RID on the next forwarded packet.
self.chosen_rid = None;
}
/// Simulcast RIDs the peer has been observed publishing.
///
/// Built up incrementally on each received `MediaData`. Empty until the
/// first video packet arrives. Callers that use this as the "available
/// layers" input should fall back to the full ladder (`[LOW, MEDIUM, HIGH]`)
/// when empty — before the first packet the full ladder is the correct assumption.
#[must_use]
pub fn active_rids(&self) -> Vec<SfuRid> {
self.active_rids.iter().copied().collect()
}
/// Number of `MediaData` events forwarded to this client after layer filtering.
#[must_use]
pub fn delivered_media_count(&self) -> u64 {
self.delivered_media.load(Ordering::Relaxed)
}
/// Number of `ActiveSpeakerChanged` events delivered to this client.
///
/// Only available with `test-utils` feature; used to verify skip-self semantics.
#[cfg(any(test, feature = "test-utils"))]
#[must_use]
pub fn delivered_active_speaker_count(&self) -> u64 {
self.delivered_active_speaker.load(Ordering::Relaxed)
}
/// Whether the underlying str0m `Rtc` is still alive.
#[must_use]
pub fn is_alive(&self) -> bool {
self.rtc.is_alive()
}
/// Demux probe — returns `true` if this client owns the given datagram.
///
/// Used by the registry to route incoming UDP to the correct peer.
#[must_use]
pub fn accepts(&self, datagram: &IncomingDatagram) -> bool {
let Ok(contents) = (&datagram.contents[..]).try_into() else {
return false;
};
let input = str0m::Input::Receive(
datagram.received_at,
str0m::net::Receive {
proto: datagram.proto.to_str0m(),
source: datagram.source,
destination: datagram.destination,
contents,
},
);
self.rtc.accepts(&input)
}
/// Feed a new egress BWE reading to this subscriber's pacer.
///
/// If the action is `PacerAction::ChangeLayer`, `desired_layer` is updated
/// in-place before returning. For `GoAudioOnly` / `RestoreVideo`, the registry
/// should emit `Propagated::AudioOnlyMode`.
///
/// Only available with the `pacer` feature.
#[cfg(feature = "pacer")]
#[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
pub fn drive_pacer(&mut self, bps: u64) -> crate::bwe::PacerAction {
let action = self.pacer.update(bps);
if let crate::bwe::PacerAction::ChangeLayer(rid) = action {
self.set_desired_layer(rid);
}
action
}
/// Set the maximum AV1 temporal layer to forward to this subscriber.
///
/// Packets with `temporal_id > max` are dropped at fanout.
/// Default is `u8::MAX` (all layers forwarded).
///
/// Only available with the `av1-dd` feature.
#[cfg(feature = "av1-dd")]
#[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
pub fn set_max_temporal_layer(&mut self, max: u8) {
self.max_temporal_layer = max;
}
/// Current AV1 temporal layer cap.
///
/// Only available with the `av1-dd` feature.
#[cfg(feature = "av1-dd")]
#[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
#[must_use]
pub fn max_temporal_layer(&self) -> u8 {
self.max_temporal_layer
}
/// Set the maximum RFC 9626 temporal layer to forward to this subscriber.
///
/// Packets with `temporal_id > max` are dropped at fanout.
/// Default: `u8::MAX` (all layers forwarded).
#[cfg(feature = "vfm")]
#[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
pub fn set_max_vfm_temporal_layer(&mut self, max: u8) {
self.max_vfm_temporal_layer = max;
}
/// Current RFC 9626 temporal layer cap.
#[cfg(feature = "vfm")]
#[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
#[must_use]
pub fn max_vfm_temporal_layer(&self) -> u8 {
self.max_vfm_temporal_layer
}
/// This client's origin (local peer or upstream SFU relay).
#[must_use]
pub fn origin(&self) -> &crate::origin::ClientOrigin {
&self.origin
}
/// Override the client origin.
///
/// Must be called **before** [`Registry::insert`][crate::Registry::insert].
/// See [`ClientOrigin`][crate::origin::ClientOrigin] for the call-order contract.
pub fn set_origin(&mut self, origin: crate::origin::ClientOrigin) {
self.origin = origin;
}
/// Returns `true` if this client is a relay connection from another SFU edge.
#[must_use]
pub fn is_relay(&self) -> bool {
matches!(self.origin, crate::origin::ClientOrigin::RelayFromSfu(_))
}
/// Set the suspended flag. Called by the registry pacer path when
/// `PacerAction::SuspendVideo` / `RestoreAudio` fires.
/// Only available with the `pacer` feature.
#[cfg(feature = "pacer")]
#[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
pub fn set_suspended(&mut self, suspended: bool) {
self.suspended = suspended;
}
/// Read the suspended flag.
#[cfg(feature = "pacer")]
#[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
#[must_use]
pub fn is_suspended(&self) -> bool {
self.suspended
}
}
#[cfg(all(test, feature = "pacer"))]
mod tests {
use crate::client::test_seed::new_client;
use crate::propagate::ClientId;
#[test]
fn suspended_default_is_false() {
let client = new_client(ClientId(0));
assert!(
!client.is_suspended(),
"Client::new should initialise suspended to false"
);
}
#[test]
fn set_suspended_round_trips() {
let mut client = new_client(ClientId(1));
assert!(!client.is_suspended());
client.set_suspended(true);
assert!(client.is_suspended());
client.set_suspended(false);
assert!(!client.is_suspended());
}
}