oxpulse_sfu_kit/client/accessors.rs
1//! Public read/write accessors on [`Client`].
2//!
3//! Separated from the core poll/event loop in `mod.rs` to keep that file
4//! focused on str0m I/O driving.
5
6use std::sync::atomic::Ordering;
7
8use super::Client;
9use crate::dc::ChannelConfig;
10use crate::ids::SfuRid;
11use crate::net::IncomingDatagram;
12
13impl Client {
14 /// Pre-registered DataChannels to open during SDP negotiation.
15 ///
16 /// The application signalling layer (e.g. `partner-edge`) reads this slice
17 /// and calls `Rtc::open_stream` for each entry. Populated via
18 /// [`with_extra_dc`][Client::with_extra_dc], [`with_chat_dcs`][Client::with_chat_dcs],
19 /// and [`with_voice_dc`][Client::with_voice_dc].
20 #[must_use]
21 pub fn extra_dcs(&self) -> &[ChannelConfig] {
22 &self.extra_dcs
23 }
24
25 /// This subscriber's current desired simulcast layer.
26 #[must_use]
27 pub fn desired_layer(&self) -> SfuRid {
28 self.desired_layer
29 }
30
31 /// Override this subscriber's desired simulcast layer.
32 ///
33 /// Takes effect on the next forwarded packet; no SDP renegotiation required.
34 pub fn set_desired_layer(&mut self, rid: SfuRid) {
35 self.desired_layer = rid;
36 // Invalidate the cached chosen layer so keyframe requests target the
37 // correct RID on the next forwarded packet.
38 self.chosen_rid = None;
39 }
40
41 /// Simulcast RIDs the peer has been observed publishing.
42 ///
43 /// Built up incrementally on each received `MediaData`. Empty until the
44 /// first video packet arrives. Callers that use this as the "available
45 /// layers" input should fall back to the full ladder (`[LOW, MEDIUM, HIGH]`)
46 /// when empty — before the first packet the full ladder is the correct assumption.
47 #[must_use]
48 pub fn active_rids(&self) -> Vec<SfuRid> {
49 self.active_rids.iter().copied().collect()
50 }
51
52 /// Number of `MediaData` events forwarded to this client after layer filtering.
53 #[must_use]
54 pub fn delivered_media_count(&self) -> u64 {
55 self.delivered_media.load(Ordering::Relaxed)
56 }
57
58 /// Number of `ActiveSpeakerChanged` events delivered to this client.
59 ///
60 /// Only available with `test-utils` feature; used to verify skip-self semantics.
61 #[cfg(any(test, feature = "test-utils"))]
62 #[must_use]
63 pub fn delivered_active_speaker_count(&self) -> u64 {
64 self.delivered_active_speaker.load(Ordering::Relaxed)
65 }
66
67 /// Whether the underlying str0m `Rtc` is still alive.
68 #[must_use]
69 pub fn is_alive(&self) -> bool {
70 self.rtc.is_alive()
71 }
72
73 /// Demux probe — returns `true` if this client owns the given datagram.
74 ///
75 /// Used by the registry to route incoming UDP to the correct peer.
76 #[must_use]
77 pub fn accepts(&self, datagram: &IncomingDatagram) -> bool {
78 let Ok(contents) = (&datagram.contents[..]).try_into() else {
79 return false;
80 };
81 let input = str0m::Input::Receive(
82 datagram.received_at,
83 str0m::net::Receive {
84 proto: datagram.proto.to_str0m(),
85 source: datagram.source,
86 destination: datagram.destination,
87 contents,
88 },
89 );
90 self.rtc.accepts(&input)
91 }
92
93 /// Feed a new egress BWE reading to this subscriber's pacer.
94 ///
95 /// If the action is `PacerAction::ChangeLayer`, `desired_layer` is updated
96 /// in-place before returning. For `GoAudioOnly` / `RestoreVideo`, the registry
97 /// should emit `Propagated::AudioOnlyMode`.
98 ///
99 /// Only available with the `pacer` feature.
100 #[cfg(feature = "pacer")]
101 #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
102 pub fn drive_pacer(&mut self, bps: u64) -> crate::bwe::PacerAction {
103 let action = self.pacer.update(bps);
104 if let crate::bwe::PacerAction::ChangeLayer(rid) = action {
105 self.set_desired_layer(rid);
106 }
107 action
108 }
109
110 /// Set the maximum AV1 temporal layer to forward to this subscriber.
111 ///
112 /// Packets with `temporal_id > max` are dropped at fanout.
113 /// Default is `u8::MAX` (all layers forwarded).
114 ///
115 /// Only available with the `av1-dd` feature.
116 #[cfg(feature = "av1-dd")]
117 #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
118 pub fn set_max_temporal_layer(&mut self, max: u8) {
119 self.max_temporal_layer = max;
120 }
121
122 /// Current AV1 temporal layer cap.
123 ///
124 /// Only available with the `av1-dd` feature.
125 #[cfg(feature = "av1-dd")]
126 #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
127 #[must_use]
128 pub fn max_temporal_layer(&self) -> u8 {
129 self.max_temporal_layer
130 }
131
132 /// Set the maximum RFC 9626 temporal layer to forward to this subscriber.
133 ///
134 /// Packets with `temporal_id > max` are dropped at fanout.
135 /// Default: `u8::MAX` (all layers forwarded).
136 #[cfg(feature = "vfm")]
137 #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
138 pub fn set_max_vfm_temporal_layer(&mut self, max: u8) {
139 self.max_vfm_temporal_layer = max;
140 }
141
142 /// Current RFC 9626 temporal layer cap.
143 #[cfg(feature = "vfm")]
144 #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
145 #[must_use]
146 pub fn max_vfm_temporal_layer(&self) -> u8 {
147 self.max_vfm_temporal_layer
148 }
149
150 /// This client's origin (local peer or upstream SFU relay).
151 #[must_use]
152 pub fn origin(&self) -> &crate::origin::ClientOrigin {
153 &self.origin
154 }
155
156 /// Override the client origin.
157 ///
158 /// Must be called **before** [`Registry::insert`][crate::Registry::insert].
159 /// See [`ClientOrigin`][crate::origin::ClientOrigin] for the call-order contract.
160 pub fn set_origin(&mut self, origin: crate::origin::ClientOrigin) {
161 self.origin = origin;
162 }
163
164 /// Returns `true` if this client is a relay connection from another SFU edge.
165 #[must_use]
166 pub fn is_relay(&self) -> bool {
167 matches!(self.origin, crate::origin::ClientOrigin::RelayFromSfu(_))
168 }
169
170 /// Set the suspended flag. Called by the registry pacer path when
171 /// `PacerAction::SuspendVideo` / `RestoreAudio` fires.
172 /// Only available with the `pacer` feature.
173 #[cfg(feature = "pacer")]
174 #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
175 pub fn set_suspended(&mut self, suspended: bool) {
176 self.suspended = suspended;
177 }
178
179 /// Read the suspended flag.
180 #[cfg(feature = "pacer")]
181 #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
182 #[must_use]
183 pub fn is_suspended(&self) -> bool {
184 self.suspended
185 }
186}
187
188#[cfg(all(test, feature = "pacer"))]
189mod tests {
190 use crate::client::test_seed::new_client;
191 use crate::propagate::ClientId;
192
193 #[test]
194 fn suspended_default_is_false() {
195 let client = new_client(ClientId(0));
196 assert!(
197 !client.is_suspended(),
198 "Client::new should initialise suspended to false"
199 );
200 }
201
202 #[test]
203 fn set_suspended_round_trips() {
204 let mut client = new_client(ClientId(1));
205 assert!(!client.is_suspended());
206 client.set_suspended(true);
207 assert!(client.is_suspended());
208 client.set_suspended(false);
209 assert!(!client.is_suspended());
210 }
211}