oxpulse_sfu_kit/client/fanout.rs
1//! Downstream fanout: apply a forwarded `MediaData` or speaker-change event
2//! to *this* peer.
3//!
4//! Split from `client/mod.rs` because it owns a distinct concern: per-subscriber
5//! simulcast layer filtering and the writer-stage early-returns that tolerate
6//! unnegotiated sessions in tests.
7
8use std::sync::atomic::Ordering;
9
10use str0m::media::{MediaKind, Rid};
11
12use super::{layer, Client};
13use crate::media::SfuMediaPayload;
14use crate::propagate::ClientId;
15
16impl Client {
17 /// Forward a `SfuMediaPayload` from `origin` out to this peer.
18 ///
19 /// Applies the simulcast layer filter (drops packets not matching
20 /// [`desired_layer`][Client::desired_layer]) and increments Prometheus
21 /// counters for forwarded packets and layer selections.
22 pub fn handle_media_data_out(&mut self, origin: ClientId, data: &SfuMediaPayload) {
23 // Suspended-state filter: drop video frames when the per-subscriber
24 // pacer is in the `suspended` sub-state. Audio frames continue to flow.
25 // Phase 7 of the 1 KB/s resilience plan.
26 #[cfg(feature = "pacer")]
27 if self.suspended {
28 // Detect media kind by matching the inbound track on origin + mid.
29 // Defensive default = `true` (treat unknown as video → drop): a
30 // suspended subscriber's whole point is bandwidth conservation, so
31 // when track metadata is unavailable (Weak::upgrade fails on a
32 // disconnected publisher, or no matching tracks_out entry) we bias
33 // toward dropping rather than forwarding. This inverts the polarity
34 // of the simulcast layer filter below — that one biases toward
35 // forward because uncertainty there means "no simulcast" not "no
36 // video"; here uncertainty must not leak bandwidth.
37 let is_video = self
38 .tracks_out
39 .iter()
40 .find_map(|o| {
41 let i = o.track_in.upgrade()?;
42 if i.origin == origin && i.mid == data.mid().to_str0m() {
43 Some(matches!(i.kind, MediaKind::Video))
44 } else {
45 None
46 }
47 })
48 .unwrap_or(true);
49 if is_video {
50 // F7-1: use cached handle — single atomic add, no per-frame alloc.
51 #[cfg(feature = "metrics-prometheus")]
52 self.video_frames_dropped.inc();
53 #[cfg(not(feature = "metrics-prometheus"))]
54 self.metrics.inc_video_frames_dropped(*self.id);
55 return;
56 }
57 }
58
59 // Use LayerSelector to pick the best available RID for this subscriber.
60 // active_rids() is empty until the first video packet arrives — fall back
61 // to the old RID-exact match in that case (BestFitSelector handles empty correctly).
62 {
63 use crate::layer_selector::{BestFitSelector, LayerSelector as _};
64 let active: Vec<crate::ids::SfuRid> = self.active_rids();
65 let target = BestFitSelector.select(self.desired_layer, &active);
66 match data.rid() {
67 None => {} // non-simulcast: always forward
68 Some(rid) if rid == target => {} // correct layer
69 Some(_) => return, // wrong layer — drop
70 }
71 }
72
73 // Drop AV1 packets whose temporal layer exceeds this subscriber's cap.
74 #[cfg(feature = "av1-dd")]
75 if let Some(dd) = data.av1_dd() {
76 if dd.temporal_id > self.max_temporal_layer {
77 return;
78 }
79 }
80
81 // Drop H.264/VP9/HEVC packets whose temporal layer exceeds this subscriber's cap.
82 #[cfg(feature = "vfm")]
83 if let Some(fm) = data.vfm_frame_marking() {
84 if fm.temporal_id > self.max_vfm_temporal_layer {
85 return;
86 }
87 }
88
89 let data_mid = data.mid().to_str0m();
90
91 // Find the matching outbound track entry.
92 let matched = self.tracks_out.iter().find(|o| {
93 o.track_in
94 .upgrade()
95 .filter(|i| i.origin == origin && i.mid == data_mid)
96 .is_some()
97 });
98
99 // Prometheus: forwarded_packets_total{kind}.
100 let kind_label = matched
101 .and_then(|o| o.track_in.upgrade())
102 .map(|t| match t.kind {
103 MediaKind::Audio => "audio",
104 MediaKind::Video => "video",
105 })
106 .unwrap_or("other");
107 self.metrics.inc_forwarded_packets(kind_label);
108
109 // Prometheus: layer_selection_total{layer} — simulcast packets only.
110 if let Some(rid) = data.rid() {
111 let layer_label = rid_label(rid.to_str0m());
112 self.metrics.inc_layer_selection(layer_label);
113 }
114
115 // Count *after* the filter, *before* writer early-returns.
116 self.delivered_media.fetch_add(1, Ordering::Relaxed);
117
118 let Some(mid) = self
119 .tracks_out
120 .iter()
121 .find(|o| {
122 o.track_in
123 .upgrade()
124 .filter(|i| i.origin == origin && i.mid == data_mid)
125 .is_some()
126 })
127 .and_then(|o| o.mid())
128 else {
129 return;
130 };
131
132 // Track the last forwarded RID so keyframe requests target the same layer.
133 let data_rid = data.rid().map(|r| r.to_str0m());
134 if data_rid.is_some() && self.chosen_rid != data_rid {
135 self.chosen_rid = data_rid;
136 }
137
138 let Some(writer) = self.rtc.writer(mid) else {
139 return;
140 };
141 let (_pt_raw, network_time, rtp_time, _rid, payload, params) = data.clone_write_parts();
142 let Some(pt) = writer.match_params(params) else {
143 return;
144 };
145 // Observe end-to-end forwarding latency: time from publisher receipt to
146 // subscriber str0m handoff. Uses the packet's own network_time so the
147 // measurement is independent of when handle_media_data_out was called.
148 // observe() is a single atomic bucket increment — no allocation.
149 self.metrics
150 .observe_forward_latency(kind_label, network_time.elapsed().as_secs_f64());
151 if let Err(e) = writer.write(pt, network_time, rtp_time, payload) {
152 tracing::warn!(client = *self.id, error = ?e, "writer.write failed");
153 self.rtc.disconnect();
154 }
155 }
156
157 /// Handle a dominant-speaker election change.
158 ///
159 /// The registry skips the speaker themselves (skip-self rule), so this
160 /// method is only called on *other* clients. In `test-utils` builds a
161 /// counter is bumped to let tests verify skip-self semantics.
162 #[cfg(feature = "active-speaker")]
163 pub fn handle_active_speaker_changed(&mut self, _peer_id: u64) {
164 #[cfg(any(test, feature = "test-utils"))]
165 {
166 self.delivered_active_speaker
167 .fetch_add(1, Ordering::Relaxed);
168 }
169 }
170}
171
172fn rid_label(rid: Rid) -> &'static str {
173 if rid == layer::LOW.to_str0m() {
174 "q"
175 } else if rid == layer::MEDIUM.to_str0m() {
176 "h"
177 } else if rid == layer::HIGH.to_str0m() {
178 "f"
179 } else {
180 "other"
181 }
182}