oxpulse_sfu_kit/client/
mod.rs1use std::collections::{HashSet, VecDeque};
13use std::sync::atomic::AtomicU64;
14use std::sync::{Arc, Weak};
15use std::time::Instant;
16
17use str0m::media::{KeyframeRequestKind, MediaData, MediaKind, Mid, Rid};
18use str0m::{Event, IceConnectionState, Output, Rtc};
19
20use crate::dc::ChannelConfig;
21use crate::ids::SfuRid;
22use crate::metrics::SfuMetrics;
23use crate::net::{IncomingDatagram, OutgoingDatagram};
24use crate::propagate::{ClientId, Propagated};
25
26pub mod accessors;
27pub mod construct;
28pub mod dc_builder;
29pub mod fanout;
30pub mod keyframe;
31pub mod layer;
32pub mod stats;
33#[cfg(any(test, feature = "test-utils"))]
34pub mod test_seed;
35pub mod tracks;
36
37pub use tracks::TrackIn;
38use tracks::{TrackInEntry, TrackOut, TrackOutState};
39
40#[derive(Debug)]
46pub struct Client {
47 pub id: ClientId,
49 pub(crate) origin: crate::origin::ClientOrigin,
51 pub(crate) rtc: Rtc,
52 pub(crate) tracks_in: Vec<TrackInEntry>,
53 pub(crate) tracks_out: Vec<TrackOut>,
54 pub(crate) chosen_rid: Option<Rid>,
56 pub(crate) desired_layer: SfuRid,
58 pub(crate) active_rids: HashSet<SfuRid>,
61 pub(crate) pending_out: VecDeque<str0m::net::Transmit>,
63 pub(crate) metrics: Arc<SfuMetrics>,
65 pub(crate) delivered_media: AtomicU64,
67 #[cfg(any(test, feature = "test-utils"))]
69 pub(crate) delivered_active_speaker: AtomicU64,
70 #[cfg(feature = "metrics-prometheus")]
75 pub(crate) video_frames_dropped: prometheus::IntCounter,
76 #[cfg(feature = "pacer")]
78 pub(crate) pacer: crate::bwe::SubscriberPacer,
79 #[cfg(feature = "pacer")]
88 pub(crate) suspended: bool,
89 #[cfg(feature = "av1-dd")]
91 pub(crate) max_temporal_layer: u8,
92 #[cfg(feature = "vfm")]
94 pub(crate) max_vfm_temporal_layer: u8,
95 pub(crate) extra_dcs: Vec<ChannelConfig>,
102}
103
104impl Client {
105 pub fn handle_input(&mut self, datagram: IncomingDatagram) {
107 if !self.rtc.is_alive() {
108 return;
109 }
110 let contents = match (&datagram.contents[..]).try_into() {
111 Ok(c) => c,
112 Err(_) => {
113 tracing::debug!(client = *self.id, "ignoring empty or invalid datagram");
114 return;
115 }
116 };
117 let input = str0m::Input::Receive(
118 datagram.received_at,
119 str0m::net::Receive {
120 proto: datagram.proto.to_str0m(),
121 source: datagram.source,
122 destination: datagram.destination,
123 contents,
124 },
125 );
126 if let Err(e) = self.rtc.handle_input(input) {
127 tracing::warn!(client = *self.id, error = ?e, "client disconnected on handle_input");
128 self.rtc.disconnect();
129 }
130 }
131
132 pub(crate) fn handle_timeout(&mut self, at: Instant) {
134 if !self.rtc.is_alive() {
135 return;
136 }
137 if let Err(e) = self.rtc.handle_input(str0m::Input::Timeout(at)) {
138 tracing::warn!(client = *self.id, error = ?e, "client disconnected on timeout");
139 self.rtc.disconnect();
140 }
141 }
142
143 pub fn poll_output(&mut self) -> Propagated {
148 if !self.rtc.is_alive() {
149 return Propagated::Noop;
150 }
151 match self.rtc.poll_output() {
152 Ok(output) => self.handle_output(output),
153 Err(e) => {
154 tracing::warn!(client = *self.id, error = ?e, "poll_output failed");
155 self.rtc.disconnect();
156 Propagated::Noop
157 }
158 }
159 }
160
161 fn handle_output(&mut self, output: Output) -> Propagated {
162 match output {
163 Output::Transmit(t) => {
164 self.pending_out.push_back(t);
165 Propagated::Noop
166 }
167 Output::Timeout(t) => Propagated::Timeout(t),
168 Output::Event(e) => self.handle_event(e),
169 }
170 }
171
172 fn handle_event(&mut self, event: Event) -> Propagated {
173 match event {
174 Event::IceConnectionStateChange(IceConnectionState::Disconnected) => {
175 self.rtc.disconnect();
176 Propagated::Noop
177 }
178 Event::MediaAdded(m) => self.track_in_added(m.mid, m.kind),
179 Event::MediaData(data) => self.track_in_media(data),
180 Event::KeyframeRequest(req) => self.incoming_keyframe_req(req),
181 Event::EgressBitrateEstimate(bwe) => stats::propagate_bwe(self.id, bwe),
182 Event::PeerStats(s) => stats::propagate_peer_stats(self.id, s),
183 _ => Propagated::Noop,
184 }
185 }
186
187 fn track_in_added(&mut self, mid: Mid, kind: MediaKind) -> Propagated {
188 let entry = TrackInEntry {
189 id: Arc::new(TrackIn {
190 origin: self.id,
191 mid,
192 kind,
193 relay_source: self.is_relay(),
194 }),
195 last_keyframe_request: None,
196 };
197 let weak = Arc::downgrade(&entry.id);
198 self.tracks_in.push(entry);
199 Propagated::TrackOpen(self.id, weak)
200 }
201
202 fn track_in_media(&mut self, data: MediaData) -> Propagated {
203 if !data.contiguous {
204 self.request_keyframe_throttled(data.mid, data.rid, KeyframeRequestKind::Fir);
205 }
206 if let Some(rid) = data.rid {
207 self.active_rids.insert(SfuRid::from_str0m(rid));
208 }
209 Propagated::MediaData(self.id, crate::media::SfuMediaPayload::from_str0m(data))
210 }
211
212 pub fn handle_track_open(&mut self, track_in: Weak<TrackIn>) {
214 self.tracks_out.push(TrackOut {
215 track_in,
216 state: TrackOutState::ToOpen,
217 });
218 }
219
220 pub fn drain_pending_out(&mut self) -> impl Iterator<Item = OutgoingDatagram> + '_ {
224 std::mem::take(&mut self.pending_out)
225 .into_iter()
226 .map(OutgoingDatagram::from_transmit)
227 }
228}