1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//! Downstream fanout: apply a forwarded `MediaData` or speaker-change event
//! to *this* peer.
//!
//! Split from `client/mod.rs` because it owns a distinct concern: per-subscriber
//! simulcast layer filtering and the writer-stage early-returns that tolerate
//! unnegotiated sessions in tests.
use std::sync::atomic::Ordering;
use str0m::media::{MediaKind, Rid};
use super::{layer, Client};
use crate::media::SfuMediaPayload;
use crate::propagate::ClientId;
impl Client {
/// Forward a `SfuMediaPayload` from `origin` out to this peer.
///
/// Applies the simulcast layer filter (drops packets not matching
/// [`desired_layer`][Client::desired_layer]) and increments Prometheus
/// counters for forwarded packets and layer selections.
pub fn handle_media_data_out(&mut self, origin: ClientId, data: &SfuMediaPayload) {
// Suspended-state filter: drop video frames when the per-subscriber
// pacer is in the `suspended` sub-state. Audio frames continue to flow.
// Phase 7 of the 1 KB/s resilience plan.
#[cfg(feature = "pacer")]
if self.suspended {
// Detect media kind by matching the inbound track on origin + mid.
// Defensive default = `true` (treat unknown as video → drop): a
// suspended subscriber's whole point is bandwidth conservation, so
// when track metadata is unavailable (Weak::upgrade fails on a
// disconnected publisher, or no matching tracks_out entry) we bias
// toward dropping rather than forwarding. This inverts the polarity
// of the simulcast layer filter below — that one biases toward
// forward because uncertainty there means "no simulcast" not "no
// video"; here uncertainty must not leak bandwidth.
let is_video = self
.tracks_out
.iter()
.find_map(|o| {
let i = o.track_in.upgrade()?;
if i.origin == origin && i.mid == data.mid().to_str0m() {
Some(matches!(i.kind, MediaKind::Video))
} else {
None
}
})
.unwrap_or(true);
if is_video {
// F7-1: use cached handle — single atomic add, no per-frame alloc.
#[cfg(feature = "metrics-prometheus")]
self.video_frames_dropped.inc();
#[cfg(not(feature = "metrics-prometheus"))]
self.metrics.inc_video_frames_dropped(*self.id);
return;
}
}
// Use LayerSelector to pick the best available RID for this subscriber.
// active_rids() is empty until the first video packet arrives — fall back
// to the old RID-exact match in that case (BestFitSelector handles empty correctly).
{
use crate::layer_selector::{BestFitSelector, LayerSelector as _};
let active: Vec<crate::ids::SfuRid> = self.active_rids();
let target = BestFitSelector.select(self.desired_layer, &active);
match data.rid() {
None => {} // non-simulcast: always forward
Some(rid) if rid == target => {} // correct layer
Some(_) => return, // wrong layer — drop
}
}
// Drop AV1 packets whose temporal layer exceeds this subscriber's cap.
#[cfg(feature = "av1-dd")]
if let Some(dd) = data.av1_dd() {
if dd.temporal_id > self.max_temporal_layer {
return;
}
}
// Drop H.264/VP9/HEVC packets whose temporal layer exceeds this subscriber's cap.
#[cfg(feature = "vfm")]
if let Some(fm) = data.vfm_frame_marking() {
if fm.temporal_id > self.max_vfm_temporal_layer {
return;
}
}
let data_mid = data.mid().to_str0m();
// Find the matching outbound track entry.
let matched = self.tracks_out.iter().find(|o| {
o.track_in
.upgrade()
.filter(|i| i.origin == origin && i.mid == data_mid)
.is_some()
});
// Prometheus: forwarded_packets_total{kind}.
let kind_label = matched
.and_then(|o| o.track_in.upgrade())
.map(|t| match t.kind {
MediaKind::Audio => "audio",
MediaKind::Video => "video",
})
.unwrap_or("other");
self.metrics.inc_forwarded_packets(kind_label);
// Prometheus: layer_selection_total{layer} — simulcast packets only.
if let Some(rid) = data.rid() {
let layer_label = rid_label(rid.to_str0m());
self.metrics.inc_layer_selection(layer_label);
}
// Count *after* the filter, *before* writer early-returns.
self.delivered_media.fetch_add(1, Ordering::Relaxed);
let Some(mid) = self
.tracks_out
.iter()
.find(|o| {
o.track_in
.upgrade()
.filter(|i| i.origin == origin && i.mid == data_mid)
.is_some()
})
.and_then(|o| o.mid())
else {
return;
};
// Track the last forwarded RID so keyframe requests target the same layer.
let data_rid = data.rid().map(|r| r.to_str0m());
if data_rid.is_some() && self.chosen_rid != data_rid {
self.chosen_rid = data_rid;
}
let Some(writer) = self.rtc.writer(mid) else {
return;
};
let (_pt_raw, network_time, rtp_time, _rid, payload, params) = data.clone_write_parts();
let Some(pt) = writer.match_params(params) else {
return;
};
if let Err(e) = writer.write(pt, network_time, rtp_time, payload) {
tracing::warn!(client = *self.id, error = ?e, "writer.write failed");
self.rtc.disconnect();
}
}
/// Handle a dominant-speaker election change.
///
/// The registry skips the speaker themselves (skip-self rule), so this
/// method is only called on *other* clients. In `test-utils` builds a
/// counter is bumped to let tests verify skip-self semantics.
#[cfg(feature = "active-speaker")]
pub fn handle_active_speaker_changed(&mut self, _peer_id: u64) {
#[cfg(any(test, feature = "test-utils"))]
{
self.delivered_active_speaker
.fetch_add(1, Ordering::Relaxed);
}
}
}
fn rid_label(rid: Rid) -> &'static str {
if rid == layer::LOW.to_str0m() {
"q"
} else if rid == layer::MEDIUM.to_str0m() {
"h"
} else if rid == layer::HIGH.to_str0m() {
"f"
} else {
"other"
}
}