Skip to main content

oxpulse_sfu_kit/client/
accessors.rs

1//! Public read/write accessors on [`Client`].
2//!
3//! Separated from the core poll/event loop in `mod.rs` to keep that file
4//! focused on str0m I/O driving.
5
6use std::sync::atomic::Ordering;
7
8use super::Client;
9use crate::dc::ChannelConfig;
10use crate::ids::SfuRid;
11use crate::net::IncomingDatagram;
12
13impl Client {
14    /// Pre-registered DataChannels to open during SDP negotiation.
15    ///
16    /// The application signalling layer (e.g. `partner-edge`) reads this slice
17    /// and calls `Rtc::open_stream` for each entry. Populated via
18    /// [`with_extra_dc`][Client::with_extra_dc], [`with_chat_dcs`][Client::with_chat_dcs],
19    /// and [`with_voice_dc`][Client::with_voice_dc].
20    #[must_use]
21    pub fn extra_dcs(&self) -> &[ChannelConfig] {
22        &self.extra_dcs
23    }
24
25    /// This subscriber's current desired simulcast layer.
26    #[must_use]
27    pub fn desired_layer(&self) -> SfuRid {
28        self.desired_layer
29    }
30
31    /// Override this subscriber's desired simulcast layer.
32    ///
33    /// Takes effect on the next forwarded packet; no SDP renegotiation required.
34    pub fn set_desired_layer(&mut self, rid: SfuRid) {
35        self.desired_layer = rid;
36        // Invalidate the cached chosen layer so keyframe requests target the
37        // correct RID on the next forwarded packet.
38        self.chosen_rid = None;
39    }
40
41    /// Simulcast RIDs the peer has been observed publishing.
42    ///
43    /// Built up incrementally on each received `MediaData`. Empty until the
44    /// first video packet arrives. Callers that use this as the "available
45    /// layers" input should fall back to the full ladder (`[LOW, MEDIUM, HIGH]`)
46    /// when empty — before the first packet the full ladder is the correct assumption.
47    #[must_use]
48    pub fn active_rids(&self) -> Vec<SfuRid> {
49        self.active_rids.iter().copied().collect()
50    }
51
52    /// Number of `MediaData` events forwarded to this client after layer filtering.
53    #[must_use]
54    pub fn delivered_media_count(&self) -> u64 {
55        self.delivered_media.load(Ordering::Relaxed)
56    }
57
58    /// Number of `ActiveSpeakerChanged` events delivered to this client.
59    ///
60    /// Only available with `test-utils` feature; used to verify skip-self semantics.
61    #[cfg(any(test, feature = "test-utils"))]
62    #[must_use]
63    pub fn delivered_active_speaker_count(&self) -> u64 {
64        self.delivered_active_speaker.load(Ordering::Relaxed)
65    }
66
67    /// Whether the underlying str0m `Rtc` is still alive.
68    #[must_use]
69    pub fn is_alive(&self) -> bool {
70        self.rtc.is_alive()
71    }
72
73    /// Demux probe — returns `true` if this client owns the given datagram.
74    ///
75    /// Used by the registry to route incoming UDP to the correct peer.
76    #[must_use]
77    pub fn accepts(&self, datagram: &IncomingDatagram) -> bool {
78        let Ok(contents) = (&datagram.contents[..]).try_into() else {
79            return false;
80        };
81        let input = str0m::Input::Receive(
82            datagram.received_at,
83            str0m::net::Receive {
84                proto: datagram.proto.to_str0m(),
85                source: datagram.source,
86                destination: datagram.destination,
87                contents,
88            },
89        );
90        self.rtc.accepts(&input)
91    }
92
93    /// Feed a new egress BWE reading to this subscriber's pacer.
94    ///
95    /// If the action is `PacerAction::ChangeLayer`, `desired_layer` is updated
96    /// in-place before returning. For `GoAudioOnly` / `RestoreVideo`, the registry
97    /// should emit `Propagated::AudioOnlyMode`.
98    ///
99    /// Only available with the `pacer` feature.
100    #[cfg(feature = "pacer")]
101    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
102    pub fn drive_pacer(&mut self, bps: u64) -> crate::bwe::PacerAction {
103        let action = self.pacer.update(bps);
104        if let crate::bwe::PacerAction::ChangeLayer(rid) = action {
105            self.set_desired_layer(rid);
106        }
107        action
108    }
109
110    /// Set the maximum AV1 temporal layer to forward to this subscriber.
111    ///
112    /// Packets with `temporal_id > max` are dropped at fanout.
113    /// Default is `u8::MAX` (all layers forwarded).
114    ///
115    /// Only available with the `av1-dd` feature.
116    #[cfg(feature = "av1-dd")]
117    #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
118    pub fn set_max_temporal_layer(&mut self, max: u8) {
119        self.max_temporal_layer = max;
120    }
121
122    /// Current AV1 temporal layer cap.
123    ///
124    /// Only available with the `av1-dd` feature.
125    #[cfg(feature = "av1-dd")]
126    #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
127    #[must_use]
128    pub fn max_temporal_layer(&self) -> u8 {
129        self.max_temporal_layer
130    }
131
132    /// Set the maximum RFC 9626 temporal layer to forward to this subscriber.
133    ///
134    /// Packets with `temporal_id > max` are dropped at fanout.
135    /// Default: `u8::MAX` (all layers forwarded).
136    #[cfg(feature = "vfm")]
137    #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
138    pub fn set_max_vfm_temporal_layer(&mut self, max: u8) {
139        self.max_vfm_temporal_layer = max;
140    }
141
142    /// Current RFC 9626 temporal layer cap.
143    #[cfg(feature = "vfm")]
144    #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
145    #[must_use]
146    pub fn max_vfm_temporal_layer(&self) -> u8 {
147        self.max_vfm_temporal_layer
148    }
149
150    /// This client's origin (local peer or upstream SFU relay).
151    #[must_use]
152    pub fn origin(&self) -> &crate::origin::ClientOrigin {
153        &self.origin
154    }
155
156    /// Override the client origin.
157    ///
158    /// Must be called **before** [`Registry::insert`][crate::Registry::insert].
159    /// See [`ClientOrigin`][crate::origin::ClientOrigin] for the call-order contract.
160    pub fn set_origin(&mut self, origin: crate::origin::ClientOrigin) {
161        self.origin = origin;
162    }
163
164    /// Returns `true` if this client is a relay connection from another SFU edge.
165    #[must_use]
166    pub fn is_relay(&self) -> bool {
167        matches!(self.origin, crate::origin::ClientOrigin::RelayFromSfu(_))
168    }
169
170    /// Set the suspended flag. Called by the registry pacer path when
171    /// `PacerAction::SuspendVideo` / `RestoreAudio` fires.
172    /// Only available with the `pacer` feature.
173    #[cfg(feature = "pacer")]
174    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
175    pub fn set_suspended(&mut self, suspended: bool) {
176        self.suspended = suspended;
177    }
178
179    /// Read the suspended flag.
180    #[cfg(feature = "pacer")]
181    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
182    #[must_use]
183    pub fn is_suspended(&self) -> bool {
184        self.suspended
185    }
186}
187
188#[cfg(all(test, feature = "pacer"))]
189mod tests {
190    use crate::client::test_seed::new_client;
191    use crate::propagate::ClientId;
192
193    #[test]
194    fn suspended_default_is_false() {
195        let client = new_client(ClientId(0));
196        assert!(
197            !client.is_suspended(),
198            "Client::new should initialise suspended to false"
199        );
200    }
201
202    #[test]
203    fn set_suspended_round_trips() {
204        let mut client = new_client(ClientId(1));
205        assert!(!client.is_suspended());
206        client.set_suspended(true);
207        assert!(client.is_suspended());
208        client.set_suspended(false);
209        assert!(!client.is_suspended());
210    }
211}