Skip to main content

oxpulse_sfu_kit/client/
mod.rs

1//! Per-peer state machine wrapping a str0m [`Rtc`] instance.
2//!
3//! Ported from [`str0m/examples/chat.rs`](https://github.com/algesten/str0m/blob/0.18.0/examples/chat.rs)
4//! with multi-client fanout, simulcast layer filtering, and keyframe-request
5//! plumbing added.
6//!
7//! Outbound UDP is parked on `pending_out`; the registry drains it between
8//! polls (str0m is sync, the run-loop is tokio).
9//!
10//! Submodules: [`keyframe`], [`fanout`], [`layer`], [`tracks`].
11
12use std::collections::{HashSet, VecDeque};
13use std::sync::atomic::AtomicU64;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use str0m::media::{KeyframeRequestKind, MediaData, MediaKind, Mid, Rid};
18use str0m::{Event, IceConnectionState, Output, Rtc};
19
20use crate::dc::ChannelConfig;
21use crate::ids::SfuRid;
22use crate::metrics::SfuMetrics;
23use crate::net::{IncomingDatagram, OutgoingDatagram};
24use crate::propagate::{ClientId, Propagated};
25
26pub mod accessors;
27pub mod construct;
28pub mod dc_builder;
29pub mod fanout;
30pub mod keyframe;
31pub mod layer;
32pub mod stats;
33#[cfg(any(test, feature = "test-utils"))]
34pub mod test_seed;
35pub mod tracks;
36
37pub use tracks::TrackIn;
38use tracks::{TrackInEntry, TrackOut, TrackOutState};
39
40/// Per-peer state machine wrapping a str0m [`Rtc`] instance.
41///
42/// One `Client` exists per connected peer in the room. The registry owns all
43/// clients and is the single entity that drives them via [`poll_output`][Client::poll_output]
44/// and [`handle_input`][Client::handle_input].
45#[derive(Debug)]
46pub struct Client {
47    /// Process-unique identifier for this peer.
48    pub id: ClientId,
49    /// Whether this client is a local peer or an upstream SFU relay.
50    pub(crate) origin: crate::origin::ClientOrigin,
51    pub(crate) rtc: Rtc,
52    pub(crate) tracks_in: Vec<TrackInEntry>,
53    pub(crate) tracks_out: Vec<TrackOut>,
54    /// Last simulcast RID actually forwarded to this peer. `None` = no simulcast yet.
55    pub(crate) chosen_rid: Option<Rid>,
56    /// Preferred simulcast layer (default [`layer::LOW`]).
57    pub(crate) desired_layer: SfuRid,
58    /// Simulcast RIDs this peer has been observed publishing.
59    /// Populated on every incoming `MediaData`. Empty = bootstrap / non-simulcast.
60    pub(crate) active_rids: HashSet<SfuRid>,
61    /// Outbound datagrams pending flush by the registry.
62    pub(crate) pending_out: VecDeque<str0m::net::Transmit>,
63    /// Prometheus handles (shared with the registry when inserted).
64    pub(crate) metrics: Arc<SfuMetrics>,
65    /// Post-layer-filter forwarded-media counter (readable by integration tests).
66    pub(crate) delivered_media: AtomicU64,
67    /// `ActiveSpeakerChanged` deliveries (skip-self check in tests).
68    #[cfg(any(test, feature = "test-utils"))]
69    pub(crate) delivered_active_speaker: AtomicU64,
70    /// F7-1: cached per-peer drop counter resolved once at admit so the fanout
71    /// hot path is a single atomic add with no `to_string()` alloc per frame.
72    /// Cardinality is bounded by `reap_dead`; the handle is pre-resolved here
73    /// and the `IntCounterVec` is only consulted at admit time.
74    #[cfg(feature = "metrics-prometheus")]
75    pub(crate) video_frames_dropped: prometheus::IntCounter,
76    /// Per-subscriber hysteretic layer pacer driven from egress BWE readings.
77    #[cfg(feature = "pacer")]
78    pub(crate) pacer: crate::bwe::SubscriberPacer,
79    /// True iff the per-subscriber pacer entered its `suspended` sub-state
80    /// (egress BWE below `SUSPEND_VIDEO_BPS`). Outbound video frames are
81    /// dropped while this flag is `true`; audio continues to flow.
82    /// Set when the pacer returns `PacerAction::SuspendVideo`; cleared when
83    /// it returns `PacerAction::RestoreAudio`. Set/cleared exclusively
84    /// from the registry's pacer-driven path (`drive.rs`); never mutated
85    /// by client code directly.
86    /// Only present with `pacer` feature.
87    #[cfg(feature = "pacer")]
88    pub(crate) suspended: bool,
89    /// Maximum AV1 temporal layer to forward to this subscriber (default = all).
90    #[cfg(feature = "av1-dd")]
91    pub(crate) max_temporal_layer: u8,
92    /// Maximum RFC 9626 temporal layer to forward to this subscriber (default = all).
93    #[cfg(feature = "vfm")]
94    pub(crate) max_vfm_temporal_layer: u8,
95    /// Pre-registered DataChannels to open during offer/answer.
96    ///
97    /// Populated via [`Client::with_extra_dc`] / [`Client::with_chat_dcs`] /
98    /// [`Client::with_voice_dc`]. Read by the application signalling layer
99    /// via [`Client::extra_dcs()`][Client::extra_dcs] during SDP negotiation
100    /// to call `Rtc::open_stream`.
101    pub(crate) extra_dcs: Vec<ChannelConfig>,
102}
103
104impl Client {
105    /// Feed a demuxed UDP datagram into str0m.
106    pub fn handle_input(&mut self, datagram: IncomingDatagram) {
107        if !self.rtc.is_alive() {
108            return;
109        }
110        let contents = match (&datagram.contents[..]).try_into() {
111            Ok(c) => c,
112            Err(_) => {
113                tracing::debug!(client = *self.id, "ignoring empty or invalid datagram");
114                return;
115            }
116        };
117        let input = str0m::Input::Receive(
118            datagram.received_at,
119            str0m::net::Receive {
120                proto: datagram.proto.to_str0m(),
121                source: datagram.source,
122                destination: datagram.destination,
123                contents,
124            },
125        );
126        if let Err(e) = self.rtc.handle_input(input) {
127            tracing::warn!(client = *self.id, error = ?e, "client disconnected on handle_input");
128            self.rtc.disconnect();
129        }
130    }
131
132    /// Feed a timeout event into str0m (internal use by registry tick).
133    pub(crate) fn handle_timeout(&mut self, at: Instant) {
134        if !self.rtc.is_alive() {
135            return;
136        }
137        if let Err(e) = self.rtc.handle_input(str0m::Input::Timeout(at)) {
138            tracing::warn!(client = *self.id, error = ?e, "client disconnected on timeout");
139            self.rtc.disconnect();
140        }
141    }
142
143    /// Drive str0m forward one step.
144    ///
145    /// Outbound UDP datagrams are appended to `pending_out`; the registry drains
146    /// them between polls via [`drain_pending_out`][Client::drain_pending_out].
147    pub fn poll_output(&mut self) -> Propagated {
148        if !self.rtc.is_alive() {
149            return Propagated::Noop;
150        }
151        match self.rtc.poll_output() {
152            Ok(output) => self.handle_output(output),
153            Err(e) => {
154                tracing::warn!(client = *self.id, error = ?e, "poll_output failed");
155                self.rtc.disconnect();
156                Propagated::Noop
157            }
158        }
159    }
160
161    fn handle_output(&mut self, output: Output) -> Propagated {
162        match output {
163            Output::Transmit(t) => {
164                self.pending_out.push_back(t);
165                Propagated::Noop
166            }
167            Output::Timeout(t) => Propagated::Timeout(t),
168            Output::Event(e) => self.handle_event(e),
169        }
170    }
171
172    fn handle_event(&mut self, event: Event) -> Propagated {
173        match event {
174            Event::IceConnectionStateChange(IceConnectionState::Disconnected) => {
175                self.rtc.disconnect();
176                Propagated::Noop
177            }
178            Event::MediaAdded(m) => self.track_in_added(m.mid, m.kind),
179            Event::MediaData(data) => self.track_in_media(data),
180            Event::KeyframeRequest(req) => self.incoming_keyframe_req(req),
181            Event::EgressBitrateEstimate(bwe) => stats::propagate_bwe(self.id, bwe),
182            Event::PeerStats(s) => stats::propagate_peer_stats(self.id, s),
183            _ => Propagated::Noop,
184        }
185    }
186
187    fn track_in_added(&mut self, mid: Mid, kind: MediaKind) -> Propagated {
188        let entry = TrackInEntry {
189            id: Arc::new(TrackIn {
190                origin: self.id,
191                mid,
192                kind,
193                relay_source: self.is_relay(),
194            }),
195            last_keyframe_request: None,
196        };
197        let weak = Arc::downgrade(&entry.id);
198        self.tracks_in.push(entry);
199        Propagated::TrackOpen(self.id, weak)
200    }
201
202    fn track_in_media(&mut self, data: MediaData) -> Propagated {
203        if !data.contiguous {
204            self.request_keyframe_throttled(data.mid, data.rid, KeyframeRequestKind::Fir);
205        }
206        if let Some(rid) = data.rid {
207            self.active_rids.insert(SfuRid::from_str0m(rid));
208        }
209        Propagated::MediaData(self.id, crate::media::SfuMediaPayload::from_str0m(data))
210    }
211
212    /// Register that another client opened a track we should mirror to this peer.
213    pub fn handle_track_open(&mut self, track_in: Weak<TrackIn>) {
214        self.tracks_out.push(TrackOut {
215            track_in,
216            state: TrackOutState::ToOpen,
217        });
218    }
219
220    /// Drain queued outbound datagrams.
221    ///
222    /// The registry calls this after each poll cycle to pass bytes to the tokio socket.
223    pub fn drain_pending_out(&mut self) -> impl Iterator<Item = OutgoingDatagram> + '_ {
224        std::mem::take(&mut self.pending_out)
225            .into_iter()
226            .map(OutgoingDatagram::from_transmit)
227    }
228}