use std::sync::atomic::Ordering;
use str0m::media::{MediaKind, Rid};
use super::{layer, Client};
use crate::media::SfuMediaPayload;
use crate::propagate::ClientId;
impl Client {
pub fn handle_media_data_out(&mut self, origin: ClientId, data: &SfuMediaPayload) {
{
use crate::layer_selector::{BestFitSelector, LayerSelector as _};
let active: Vec<crate::ids::SfuRid> = self.active_rids();
let target = BestFitSelector.select(self.desired_layer, &active);
match data.rid() {
None => {} Some(rid) if rid == target => {} Some(_) => return, }
}
#[cfg(feature = "av1-dd")]
if let Some(dd) = data.av1_dd() {
if dd.temporal_id > self.max_temporal_layer {
return;
}
}
#[cfg(feature = "vfm")]
if let Some(fm) = data.vfm_frame_marking() {
if fm.temporal_id > self.max_vfm_temporal_layer {
return;
}
}
let data_mid = data.mid().to_str0m();
let matched = self.tracks_out.iter().find(|o| {
o.track_in
.upgrade()
.filter(|i| i.origin == origin && i.mid == data_mid)
.is_some()
});
let kind_label = matched
.and_then(|o| o.track_in.upgrade())
.map(|t| match t.kind {
MediaKind::Audio => "audio",
MediaKind::Video => "video",
})
.unwrap_or("other");
self.metrics.inc_forwarded_packets(kind_label);
if let Some(rid) = data.rid() {
let layer_label = rid_label(rid.to_str0m());
self.metrics.inc_layer_selection(layer_label);
}
self.delivered_media.fetch_add(1, Ordering::Relaxed);
let Some(mid) = self
.tracks_out
.iter()
.find(|o| {
o.track_in
.upgrade()
.filter(|i| i.origin == origin && i.mid == data_mid)
.is_some()
})
.and_then(|o| o.mid())
else {
return;
};
let data_rid = data.rid().map(|r| r.to_str0m());
if data_rid.is_some() && self.chosen_rid != data_rid {
self.chosen_rid = data_rid;
}
let Some(writer) = self.rtc.writer(mid) else {
return;
};
let (_pt_raw, network_time, rtp_time, _rid, payload, params) = data.clone_write_parts();
let Some(pt) = writer.match_params(params) else {
return;
};
if let Err(e) = writer.write(pt, network_time, rtp_time, payload) {
tracing::warn!(client = *self.id, error = ?e, "writer.write failed");
self.rtc.disconnect();
}
}
#[cfg(feature = "active-speaker")]
pub fn handle_active_speaker_changed(&mut self, _peer_id: u64) {
#[cfg(any(test, feature = "test-utils"))]
{
self.delivered_active_speaker
.fetch_add(1, Ordering::Relaxed);
}
}
}
fn rid_label(rid: Rid) -> &'static str {
if rid == layer::LOW.to_str0m() {
"q"
} else if rid == layer::MEDIUM.to_str0m() {
"h"
} else if rid == layer::HIGH.to_str0m() {
"f"
} else {
"other"
}
}