Skip to main content

oxpulse_sfu_kit/client/
fanout.rs

1//! Downstream fanout: apply a forwarded `MediaData` or speaker-change event
2//! to *this* peer.
3//!
4//! Split from `client/mod.rs` because it owns a distinct concern: per-subscriber
5//! simulcast layer filtering and the writer-stage early-returns that tolerate
6//! unnegotiated sessions in tests.
7
8use std::sync::atomic::Ordering;
9
10use str0m::media::{MediaKind, Rid};
11
12use super::{layer, Client};
13use crate::media::SfuMediaPayload;
14use crate::propagate::ClientId;
15
16impl Client {
17    /// Forward a `SfuMediaPayload` from `origin` out to this peer.
18    ///
19    /// Applies the simulcast layer filter (drops packets not matching
20    /// [`desired_layer`][Client::desired_layer]) and increments Prometheus
21    /// counters for forwarded packets and layer selections.
22    pub fn handle_media_data_out(&mut self, origin: ClientId, data: &SfuMediaPayload) {
23        // Suspended-state filter: drop video frames when the per-subscriber
24        // pacer is in the `suspended` sub-state. Audio frames continue to flow.
25        // Phase 7 of the 1 KB/s resilience plan.
26        #[cfg(feature = "pacer")]
27        if self.suspended {
28            // Detect media kind by matching the inbound track on origin + mid.
29            // Defensive default = `true` (treat unknown as video → drop): a
30            // suspended subscriber's whole point is bandwidth conservation, so
31            // when track metadata is unavailable (Weak::upgrade fails on a
32            // disconnected publisher, or no matching tracks_out entry) we bias
33            // toward dropping rather than forwarding. This inverts the polarity
34            // of the simulcast layer filter below — that one biases toward
35            // forward because uncertainty there means "no simulcast" not "no
36            // video"; here uncertainty must not leak bandwidth.
37            let is_video = self
38                .tracks_out
39                .iter()
40                .find_map(|o| {
41                    let i = o.track_in.upgrade()?;
42                    if i.origin == origin && i.mid == data.mid().to_str0m() {
43                        Some(matches!(i.kind, MediaKind::Video))
44                    } else {
45                        None
46                    }
47                })
48                .unwrap_or(true);
49            if is_video {
50                // F7-1: use cached handle — single atomic add, no per-frame alloc.
51                #[cfg(feature = "metrics-prometheus")]
52                self.video_frames_dropped.inc();
53                #[cfg(not(feature = "metrics-prometheus"))]
54                self.metrics.inc_video_frames_dropped(*self.id);
55                return;
56            }
57        }
58
59        // Use LayerSelector to pick the best available RID for this subscriber.
60        // active_rids() is empty until the first video packet arrives — fall back
61        // to the old RID-exact match in that case (BestFitSelector handles empty correctly).
62        {
63            use crate::layer_selector::{BestFitSelector, LayerSelector as _};
64            let active: Vec<crate::ids::SfuRid> = self.active_rids();
65            let target = BestFitSelector.select(self.desired_layer, &active);
66            match data.rid() {
67                None => {}                       // non-simulcast: always forward
68                Some(rid) if rid == target => {} // correct layer
69                Some(_) => return,               // wrong layer — drop
70            }
71        }
72
73        // Drop AV1 packets whose temporal layer exceeds this subscriber's cap.
74        #[cfg(feature = "av1-dd")]
75        if let Some(dd) = data.av1_dd() {
76            if dd.temporal_id > self.max_temporal_layer {
77                return;
78            }
79        }
80
81        // Drop H.264/VP9/HEVC packets whose temporal layer exceeds this subscriber's cap.
82        #[cfg(feature = "vfm")]
83        if let Some(fm) = data.vfm_frame_marking() {
84            if fm.temporal_id > self.max_vfm_temporal_layer {
85                return;
86            }
87        }
88
89        let data_mid = data.mid().to_str0m();
90
91        // Find the matching outbound track entry.
92        let matched = self.tracks_out.iter().find(|o| {
93            o.track_in
94                .upgrade()
95                .filter(|i| i.origin == origin && i.mid == data_mid)
96                .is_some()
97        });
98
99        // Prometheus: forwarded_packets_total{kind}.
100        let kind_label = matched
101            .and_then(|o| o.track_in.upgrade())
102            .map(|t| match t.kind {
103                MediaKind::Audio => "audio",
104                MediaKind::Video => "video",
105            })
106            .unwrap_or("other");
107        self.metrics.inc_forwarded_packets(kind_label);
108
109        // Prometheus: layer_selection_total{layer} — simulcast packets only.
110        if let Some(rid) = data.rid() {
111            let layer_label = rid_label(rid.to_str0m());
112            self.metrics.inc_layer_selection(layer_label);
113        }
114
115        // Count *after* the filter, *before* writer early-returns.
116        self.delivered_media.fetch_add(1, Ordering::Relaxed);
117
118        let Some(mid) = self
119            .tracks_out
120            .iter()
121            .find(|o| {
122                o.track_in
123                    .upgrade()
124                    .filter(|i| i.origin == origin && i.mid == data_mid)
125                    .is_some()
126            })
127            .and_then(|o| o.mid())
128        else {
129            return;
130        };
131
132        // Track the last forwarded RID so keyframe requests target the same layer.
133        let data_rid = data.rid().map(|r| r.to_str0m());
134        if data_rid.is_some() && self.chosen_rid != data_rid {
135            self.chosen_rid = data_rid;
136        }
137
138        let Some(writer) = self.rtc.writer(mid) else {
139            return;
140        };
141        let (_pt_raw, network_time, rtp_time, _rid, payload, params) = data.clone_write_parts();
142        let Some(pt) = writer.match_params(params) else {
143            return;
144        };
145        // Observe end-to-end forwarding latency: time from publisher receipt to
146        // subscriber str0m handoff. Uses the packet's own network_time so the
147        // measurement is independent of when handle_media_data_out was called.
148        // observe() is a single atomic bucket increment — no allocation.
149        self.metrics
150            .observe_forward_latency(kind_label, network_time.elapsed().as_secs_f64());
151        if let Err(e) = writer.write(pt, network_time, rtp_time, payload) {
152            tracing::warn!(client = *self.id, error = ?e, "writer.write failed");
153            self.rtc.disconnect();
154        }
155    }
156
157    /// Handle a dominant-speaker election change.
158    ///
159    /// The registry skips the speaker themselves (skip-self rule), so this
160    /// method is only called on *other* clients. In `test-utils` builds a
161    /// counter is bumped to let tests verify skip-self semantics.
162    #[cfg(feature = "active-speaker")]
163    pub fn handle_active_speaker_changed(&mut self, _peer_id: u64) {
164        #[cfg(any(test, feature = "test-utils"))]
165        {
166            self.delivered_active_speaker
167                .fetch_add(1, Ordering::Relaxed);
168        }
169    }
170}
171
172fn rid_label(rid: Rid) -> &'static str {
173    if rid == layer::LOW.to_str0m() {
174        "q"
175    } else if rid == layer::MEDIUM.to_str0m() {
176        "h"
177    } else if rid == layer::HIGH.to_str0m() {
178        "f"
179    } else {
180        "other"
181    }
182}