use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use crate::Event;
use crate::bwe::BweKind;
use crate::bwe_::Bwe;
use crate::config::KeyingMaterial;
use crate::crypto::CryptoProvider;
use crate::crypto::dtls::SrtpProfile;
use crate::format::CodecConfig;
use crate::format::PayloadParams;
use crate::format::Vp9PacketizerMode;
use crate::io::{DATAGRAM_MTU, DATAGRAM_MTU_WARN, DatagramSend};
use crate::media::Media;
use crate::media::{KeyframeRequestKind, MID_PROBE};
use crate::media::{MediaAdded, MediaChanged};
use crate::pacer::PacerControl;
use crate::pacer::{Pacer, PacerImpl};
use crate::rtp::{Extension, RawPacket};
use crate::rtp_::Direction;
use crate::rtp_::MidRid;
use crate::rtp_::Pt;
use crate::rtp_::SRTCP_OVERHEAD;
use crate::rtp_::SeqNo;
use crate::rtp_::{Bitrate, ExtensionMap, Mid, Rtcp, RtcpFb};
use crate::rtp_::{RtpHeader, SessionId, TwccPacketId, extend_u16};
use crate::rtp_::{SrtpContext, Ssrc};
use crate::rtp_::{TwccRecvRegister, TwccSendRegister};
use crate::stats::StatsSnapshot;
use crate::streams::{RtpPacket, Streams};
use crate::util::{Soonest, already_happened, not_happening};
use crate::{Reason, net};
use crate::{RtcConfig, RtcError};
const NACK_MIN_INTERVAL: Duration = Duration::from_millis(33);
const TWCC_INTERVAL: Duration = Duration::from_millis(50);
pub(crate) struct Session {
id: SessionId,
pub medias: Vec<Media>,
pub streams: Streams,
app: Option<(Mid, usize)>,
reordering_size_audio: usize,
reordering_size_video: usize,
pub send_buffer_audio: usize,
pub send_buffer_video: usize,
pub exts: ExtensionMap,
pub codec_config: CodecConfig,
probe_payload_params: Option<PayloadParams>,
srtp_rx: Option<SrtpContext>,
srtp_tx: Option<SrtpContext>,
last_nack: Instant,
last_twcc: Instant,
twcc: u64,
twcc_rx_register: TwccRecvRegister,
twcc_tx_register: TwccSendRegister,
max_rx_seq_lookup: HashMap<Ssrc, SeqNo>,
bwe: Option<Bwe>,
enable_twcc_feedback: bool,
pacer: PacerImpl,
pacer_control: PacerControl,
poll_packet_buf: Vec<u8>,
pending_packet: Option<RtpPacket>,
packet_first_sent: bool,
pub ice_lite: bool,
pub rtp_mode: bool,
vp9_packetizer_mode: Vp9PacketizerMode,
feedback_tx: VecDeque<Rtcp>,
feedback_rx: VecDeque<Rtcp>,
raw_packets: Option<VecDeque<Box<RawPacket>>>,
#[cfg(feature = "_internal_test_exports")]
pending_probe: Option<crate::bwe_::ProbeClusterConfig>,
}
impl Session {
pub fn new(config: &RtcConfig) -> Self {
let mut id = SessionId::new();
const MAX_ID: u64 = 2_u64.pow(62) - 1;
while *id > MAX_ID {
id = (*id >> 1).into();
}
let (pacer, bwe) = if let Some(config) = &config.bwe_config {
let rate = config.initial_bitrate;
let pacer = PacerImpl::leaky_bucket(rate * 2.0);
let bwe = Bwe::new(rate);
(pacer, Some(bwe))
} else {
(PacerImpl::null(), None)
};
let enable_stats = config.stats_interval.is_some();
Session {
id,
medias: vec![],
streams: Streams::new(enable_stats),
app: None,
reordering_size_audio: config.reordering_size_audio,
reordering_size_video: config.reordering_size_video,
send_buffer_audio: config.send_buffer_audio,
send_buffer_video: config.send_buffer_video,
exts: config.exts.clone(),
codec_config: config.codec_config.clone(),
probe_payload_params: None,
srtp_rx: None,
srtp_tx: None,
last_nack: already_happened(),
last_twcc: already_happened(),
twcc: 0,
twcc_rx_register: TwccRecvRegister::new(100),
twcc_tx_register: TwccSendRegister::new(1000),
max_rx_seq_lookup: HashMap::new(),
bwe,
enable_twcc_feedback: false,
pacer,
pacer_control: PacerControl::new(),
poll_packet_buf: vec![0; 2000],
pending_packet: None,
packet_first_sent: false,
ice_lite: config.ice_lite,
rtp_mode: config.rtp_mode,
vp9_packetizer_mode: config.vp9_packetizer_mode,
feedback_tx: VecDeque::new(),
feedback_rx: VecDeque::new(),
raw_packets: if config.enable_raw_packets {
Some(VecDeque::new())
} else {
None
},
#[cfg(feature = "_internal_test_exports")]
pending_probe: None,
}
}
pub fn id(&self) -> SessionId {
self.id
}
pub fn set_app(&mut self, mid: Mid, index: usize) -> Result<(), String> {
if let Some((mid_existing, index_existing)) = self.app {
if mid_existing != mid {
return Err(format!("App mid changed {} != {}", mid, mid_existing,));
}
if index_existing != index {
return Err(format!("App index changed {} != {}", index, index_existing,));
}
} else {
self.app = Some((mid, index));
}
Ok(())
}
pub fn app(&self) -> &Option<(Mid, usize)> {
&self.app
}
pub fn set_keying_material(
&mut self,
mat: KeyingMaterial,
crypto: &CryptoProvider,
srtp_profile: SrtpProfile,
active: bool,
) {
let left = active;
self.srtp_rx = Some(SrtpContext::new(crypto, srtp_profile, &mat, !left));
self.srtp_tx = Some(SrtpContext::new(crypto, srtp_profile, &mat, left));
}
pub fn handle_timeout(&mut self, now: Instant) -> Result<(), RtcError> {
self.do_payload()?;
let sender_ssrc = self.streams.first_ssrc_local();
let do_nack = now >= self.nack_at().unwrap_or(not_happening());
self.streams.handle_timeout(
now,
sender_ssrc,
do_nack,
&self.medias,
&self.codec_config,
&mut self.feedback_tx,
);
if do_nack {
self.last_nack = now;
}
self.update_queue_state(now);
if let Some(twcc_at) = self.twcc_at() {
if now >= twcc_at {
self.create_twcc_feedback(sender_ssrc, now);
}
}
self.handle_timeout_bwe(now);
Ok(())
}
fn handle_timeout_bwe(&mut self, now: Instant) {
let Some(bwe) = self.bwe.as_mut() else {
return;
};
let do_probe = self.packet_first_sent && self.pacer.has_padding_queue();
if let Some(probe_config) = bwe.handle_timeout(now, do_probe) {
if bwe.start_probe(probe_config, now) {
#[cfg(feature = "_internal_test_exports")]
{
self.pending_probe = Some(probe_config);
}
self.pacer.start_probe(probe_config);
}
}
if let Some(cluster_id) = self.pacer.check_probe_complete(now) {
bwe.end_probe(now, cluster_id);
}
}
fn update_queue_state(&mut self, now: Instant) {
let iter = self.streams.streams_tx().map(|m| m.queue_state(now));
let Some(padding_request) = self.pacer.handle_timeout(now, iter) else {
return;
};
let stream = self
.streams
.stream_tx_by_midrid(padding_request.midrid)
.expect("pacer to use an existing stream");
stream.generate_padding(padding_request.padding);
}
fn create_twcc_feedback(&mut self, sender_ssrc: Ssrc, now: Instant) -> Option<()> {
self.last_twcc = now;
let mut twcc = self.twcc_rx_register.build_report(DATAGRAM_MTU - 100)?;
twcc.sender_ssrc = sender_ssrc;
twcc.ssrc = self.streams.first_ssrc_remote();
trace!("Created feedback TWCC: {:?}", twcc);
self.feedback_tx.push_front(Rtcp::Twcc(twcc));
Some(())
}
pub fn handle_rtp_receive(&mut self, now: Instant, message: &[u8]) {
let Some(header) = RtpHeader::parse(message, &self.exts) else {
trace!("Failed to parse RTP header");
return;
};
self.handle_rtp(now, header, message);
}
pub fn handle_rtcp_receive(&mut self, now: Instant, message: &[u8]) {
self.handle_rtcp(now, message);
}
fn mid_and_ssrc_for_header(&mut self, now: Instant, header: &RtpHeader) -> Option<(Mid, Ssrc)> {
let ssrc_header = header.ssrc;
if let Some(r) = self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header) {
return Some(r);
}
self.map_dynamic(header);
if let Some(r) = self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header) {
return Some(r);
}
if ssrc_header.is_probe() {
return self.ensure_probe_stream(header.payload_type);
}
None
}
fn ensure_probe_stream(&mut self, pt: Pt) -> Option<(Mid, Ssrc)> {
let ssrc: Ssrc = 0.into();
let midrid = MidRid(MID_PROBE, None);
self.probe_payload_params = Some(PayloadParams::new_probe(pt));
self.streams.expect_stream_rx(ssrc, None, midrid, true);
Some((MID_PROBE, ssrc))
}
fn map_dynamic(&mut self, header: &RtpHeader) {
let Some(mid) = header.ext_vals.mid else {
return;
};
let rid = header.ext_vals.rid.or(header.ext_vals.rid_repair);
let Some(media) = self.medias.iter_mut().find(|m| m.mid() == mid) else {
return;
};
let maybe_payload = self
.codec_config
.iter()
.find(|p| p.pt() == header.payload_type || p.resend() == Some(header.payload_type));
let Some(payload) = maybe_payload else {
return;
};
if let Some(rid) = rid {
let is_main = header.ext_vals.rid.is_some();
let midrid = MidRid(mid, Some(rid));
self.streams
.map_dynamic_by_rid(header.ssrc, midrid, media, *payload, is_main);
} else {
let is_main = payload.pt() == header.payload_type;
let midrid = MidRid(mid, None);
self.streams
.map_dynamic_by_pt(header.ssrc, midrid, media, *payload, is_main);
}
}
pub(crate) fn handle_rtp(&mut self, now: Instant, mut header: RtpHeader, buf: &[u8]) {
header.ext_vals.update_absolute_send_time(now);
trace!("Handle RTP: {:?}", header);
let Some((mid, ssrc)) = self.mid_and_ssrc_for_header(now, &header) else {
debug!("No mid/SSRC for header: {:?}", header);
return;
};
let srtp = match self.srtp_rx.as_mut() {
Some(v) => v,
None => {
trace!("Rejecting SRTP while missing SrtpContext");
return;
}
};
let stream = self.streams.stream_rx(&ssrc).unwrap();
let maybe_params = if ssrc.is_probe() {
self.probe_payload_params.as_ref()
} else {
main_payload_params(&self.codec_config, header.payload_type)
};
let params = match maybe_params {
Some(p) => p,
None => {
trace!(
"No payload params could be found (main or RTX) for {:?}",
header.payload_type
);
return;
}
};
let clock_rate = params.spec().clock_rate;
let pt = params.pt();
let is_repair = pt != header.payload_type;
let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
let mut seq_no = stream.extend_seq(&header, is_repair, max_seq_lookup);
if !stream.is_new_packet(is_repair, seq_no) {
trace!(
"Ignoring dupe packet mid: {} seq_no: {} is_repair: {}",
mid, seq_no, is_repair
);
return;
}
let mut data = match srtp.unprotect_rtp(buf, &header, *seq_no) {
Some(v) => v,
None => {
trace!(
"Failed to unprotect SRTP for SSRC: {} pt: {} mid: {} \
rid: {:?} seq_no: {} is_repair: {}",
header.ssrc,
pt,
stream.mid(),
stream.rid(),
seq_no,
is_repair
);
return;
}
};
if header.has_padding && !RtpHeader::unpad_payload(&mut data) {
trace!("unpadding of unprotected payload failed");
return;
}
if let Some(raw_packets) = &mut self.raw_packets {
raw_packets.push_back(Box::new(RawPacket::RtpRx(header.clone(), data.clone())));
}
if let Some(transport_cc) = header.ext_vals.transport_cc {
let prev = self.twcc_rx_register.max_seq();
let extended = extend_u16(prev.map(|s| *s), transport_cc);
self.twcc_rx_register.update_seq(extended.into(), now);
}
update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
let receipt_outer = stream.update_register(now, &header, clock_rate, is_repair, seq_no);
let receipt = if is_repair {
if data.is_empty() {
return;
}
stream.un_rtx(&mut header, &mut data, pt);
let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
seq_no = stream.extend_seq(&header, false, max_seq_lookup);
update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
stream.update_register(now, &header, clock_rate, false, seq_no)
} else {
receipt_outer
};
if ssrc.is_probe() {
return;
}
let packet = stream.handle_rtp(now, header, data, seq_no, receipt.time);
if self.rtp_mode {
if receipt.is_new_packet {
self.pending_packet = Some(packet);
}
} else {
let media = self.medias.iter_mut().find(|m| m.mid() == mid).unwrap();
media.depayload(
stream.rid(),
packet,
self.reordering_size_audio,
self.reordering_size_video,
&self.codec_config,
);
}
}
fn handle_rtcp(&mut self, now: Instant, buf: &[u8]) -> Option<()> {
let srtp: &mut SrtpContext = self.srtp_rx.as_mut()?;
let unprotected = srtp.unprotect_rtcp(buf)?;
Rtcp::read_packet(&unprotected, &mut self.feedback_rx);
let mut need_configure_pacer = false;
if let Some(raw_packets) = &mut self.raw_packets {
for fb in &self.feedback_rx {
raw_packets.push_back(Box::new(RawPacket::RtcpRx(fb.clone())));
}
}
for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) {
if let RtcpFb::Twcc(twcc) = fb {
trace!("Handle TWCC: {:?}", twcc);
let maybe_records = self.twcc_tx_register.apply_report(twcc, now);
if let (Some(maybe_records), Some(bwe)) = (maybe_records, &mut self.bwe) {
bwe.update(maybe_records, now);
}
need_configure_pacer = true;
continue;
}
if fb.is_for_rx() {
let Some(stream) = self.streams.stream_rx(&fb.ssrc()) else {
continue;
};
stream.handle_rtcp(now, fb);
} else {
let Some(stream) = self.streams.stream_tx(&fb.ssrc()) else {
continue;
};
stream.handle_rtcp(now, fb);
}
}
if need_configure_pacer {
self.configure_pacer();
}
Some(())
}
pub fn poll_event(&mut self) -> Option<Event> {
#[cfg(feature = "_internal_test_exports")]
{
if let Some(probe) = self.pending_probe.take() {
return Some(Event::Probe(probe));
}
}
if let Some(bitrate_estimate) = self.bwe.as_mut().and_then(|bwe| bwe.poll_estimate()) {
return Some(Event::EgressBitrateEstimate(BweKind::Twcc(
bitrate_estimate,
)));
}
if !self.ready_for_srtp() {
return None;
}
if let Some(raw_packets) = &mut self.raw_packets {
if let Some(p) = raw_packets.pop_front() {
return Some(Event::RawPacket(p));
}
}
if let Some(paused) = self.streams.poll_stream_paused() {
return Some(Event::StreamPaused(paused));
}
if self.rtp_mode {
if let Some(packet) = self.pending_packet.take() {
return Some(Event::RtpPacket(packet));
}
}
if let Some(req) = self.streams.poll_keyframe_request() {
return Some(Event::KeyframeRequest(req));
}
if let Some(report) = self.streams.poll_sender_feedback() {
return Some(Event::SenderFeedback(report));
}
if let Some((mid, bitrate)) = self.streams.poll_remb_request() {
return Some(Event::EgressBitrateEstimate(BweKind::Remb(mid, bitrate)));
}
for media in &mut self.medias {
if media.need_open_event {
media.need_open_event = false;
return Some(Event::MediaAdded(MediaAdded {
mid: media.mid(),
kind: media.kind(),
direction: media.direction(),
simulcast: media.simulcast().map(|s| s.clone().into()),
}));
}
if media.need_changed_event {
media.need_changed_event = false;
return Some(Event::MediaChanged(MediaChanged {
mid: media.mid(),
direction: media.direction(),
}));
}
}
None
}
pub fn poll_event_fallible(&mut self) -> Result<Option<Event>, RtcError> {
if self.rtp_mode {
return Ok(None);
}
for media in &mut self.medias {
if let Some(e) = media.poll_sample(&self.codec_config)? {
return Ok(Some(Event::MediaData(e)));
}
}
Ok(None)
}
fn ready_for_srtp(&self) -> bool {
self.srtp_rx.is_some() && self.srtp_tx.is_some()
}
pub fn poll_datagram(&mut self, now: Instant) -> Option<net::DatagramSend> {
if now == already_happened() {
return None;
}
let x = None
.or_else(|| self.poll_feedback())
.or_else(|| self.poll_packet(now));
if let Some(x) = &x {
if !self.rtp_mode && x.len() > DATAGRAM_MTU_WARN {
warn!("RTP above MTU {}: {}", DATAGRAM_MTU_WARN, x.len());
}
}
x
}
pub fn clear_feedback(&mut self) {
self.feedback_rx.clear();
self.feedback_tx.clear();
}
fn poll_feedback(&mut self) -> Option<net::DatagramSend> {
if self.feedback_tx.is_empty() {
return None;
}
const ENCRYPTABLE_MTU: usize = (DATAGRAM_MTU - SRTCP_OVERHEAD) & !3;
assert!(ENCRYPTABLE_MTU % 4 == 0);
let mut data = vec![0_u8; ENCRYPTABLE_MTU];
let mut raw_packets = self.raw_packets.as_mut();
let output = move |fb| {
if let Some(raw_packets) = &mut raw_packets {
raw_packets.push_back(Box::new(RawPacket::RtcpTx(fb)));
}
};
let len = Rtcp::write_packet(&mut self.feedback_tx, &mut data, output);
if len == 0 {
return None;
}
data.truncate(len);
let srtp = self.srtp_tx.as_mut()?;
let protected = srtp.protect_rtcp(&data);
assert!(
protected.len() < DATAGRAM_MTU,
"Encrypted SRTCP should be less than MTU"
);
Some(protected.into())
}
fn poll_packet(&mut self, now: Instant) -> Option<DatagramSend> {
let srtp_tx = self.srtp_tx.as_mut()?;
let (midrid, cluster_id) = self.pacer.poll_queue()?;
let Some(media) = self.medias.iter().find(|m| m.mid() == midrid.mid()) else {
trace!("Pacer pointed to mid {} which has no media", midrid.mid());
return None;
};
let buf = &mut self.poll_packet_buf;
let twcc_seq = self.twcc;
let stream = self.streams.stream_tx_by_midrid(midrid)?;
let params = &self.codec_config;
let exts = media.remote_extmap();
let twcc_enabled = exts.id_of(Extension::TransportSequenceNumber).is_some();
let twcc = twcc_enabled.then_some(&mut self.twcc);
let receipt = stream.poll_packet(now, exts, twcc, params, buf)?;
let PacketReceipt {
header,
seq_no,
is_padding,
payload_size,
} = receipt;
trace!(payload_size, is_padding, "Poll RTP: {:?}", header);
#[cfg(feature = "_internal_dont_use_log_stats")]
{
let kind = if is_padding { "padding" } else { "media" };
crate::log_stat!("PACKET_SENT", header.ssrc, payload_size, kind);
}
self.pacer.register_send(now, payload_size.into(), midrid);
if let Some(raw_packets) = &mut self.raw_packets {
raw_packets.push_back(Box::new(RawPacket::RtpTx(header.clone(), buf.clone())));
}
let protected = srtp_tx.protect_rtp(buf, &header, *seq_no);
if twcc_enabled {
let packet_id = if let Some(cluster) = cluster_id {
TwccPacketId::with_cluster(twcc_seq, cluster)
} else {
TwccPacketId::new(twcc_seq)
};
self.twcc_tx_register
.register_seq(packet_id, now, payload_size);
}
if let Some(bwe) = self.bwe.as_mut() {
bwe.on_media_sent(payload_size.into(), is_padding, now);
}
if !self.packet_first_sent {
self.packet_first_sent = true;
}
Some(protected.into())
}
pub fn poll_timeout(&mut self) -> (Option<Instant>, Reason) {
let feedback_at = self.regular_feedback_at();
let nack_at = self.nack_at();
let twcc_at = self.twcc_at();
let pacing_at = self.pacer.poll_timeout();
let packetize_at = self.medias.iter().flat_map(|m| m.poll_timeout()).next();
let paused_at = self.paused_at();
let send_stream_at = self.streams.send_stream();
let bwe_at = self
.bwe
.as_ref()
.map(|bwe| bwe.poll_timeout())
.unwrap_or((None, Reason::BweDelayControl));
(feedback_at, Reason::Feedback)
.soonest((nack_at, Reason::Nack))
.soonest((twcc_at, Reason::Twcc))
.soonest(pacing_at)
.soonest((packetize_at, Reason::Packetize))
.soonest((paused_at, Reason::PauseCheck))
.soonest((send_stream_at, Reason::SendStream))
.soonest(bwe_at)
}
pub fn has_mid(&self, mid: Mid) -> bool {
self.medias.iter().any(|m| m.mid() == mid)
}
fn regular_feedback_at(&self) -> Option<Instant> {
self.streams.regular_feedback_at()
}
fn paused_at(&self) -> Option<Instant> {
self.streams.paused_at()
}
fn nack_at(&mut self) -> Option<Instant> {
if !self.streams.any_nack_enabled() {
return None;
}
Some(self.last_nack + NACK_MIN_INTERVAL)
}
fn twcc_at(&self) -> Option<Instant> {
let is_receiving = self.streams.is_receiving();
if is_receiving && self.enable_twcc_feedback && self.twcc_rx_register.has_unreported() {
Some(self.last_twcc + TWCC_INTERVAL)
} else {
None
}
}
pub fn enable_twcc_feedback(&mut self) {
if !self.enable_twcc_feedback {
debug!("Enable TWCC feedback");
self.enable_twcc_feedback = true;
}
}
pub fn visit_stats(&mut self, now: Instant, snapshot: &mut StatsSnapshot) {
for stream in self.streams.streams_tx() {
stream.visit_stats(snapshot, now);
}
for stream in self.streams.streams_rx() {
stream.visit_stats(snapshot, now);
}
snapshot.tx = snapshot.egress.values().map(|s| s.bytes).sum();
snapshot.rx = snapshot.ingress.values().map(|s| s.bytes).sum();
snapshot.bwe_tx = self.bwe.as_ref().and_then(|bwe| bwe.last_estimate());
snapshot.egress_loss_fraction = self.twcc_tx_register.loss(Duration::from_secs(1), now);
snapshot.rtt = self.twcc_tx_register.rtt();
snapshot.ingress_loss_fraction = self.twcc_rx_register.loss();
}
pub fn set_bwe_desired_bitrate(&mut self, desired_bitrate: Bitrate) {
if let Some(bwe) = self.bwe.as_mut() {
bwe.set_desired_bitrate(desired_bitrate);
self.configure_pacer();
}
}
pub fn reset_bwe(&mut self, init_bitrate: Bitrate) {
if let Some(bwe) = self.bwe.as_mut() {
bwe.reset(init_bitrate);
}
}
pub fn line_count(&self) -> usize {
self.medias.len() + if self.app.is_some() { 1 } else { 0 }
}
pub fn add_media(&mut self, media: Media) {
self.medias.push(media);
}
pub fn medias(&self) -> &[Media] {
&self.medias
}
pub fn remove_media(&mut self, mid: Mid) {
self.medias.retain(|media| media.mid() != mid);
self.streams.remove_streams_by_mid(mid);
}
fn configure_pacer(&mut self) {
let Some(bwe) = self.bwe.as_mut() else {
return;
};
let Some(current_estimate) = bwe.last_estimate() else {
return;
};
let is_overuse = bwe.is_overusing();
let has_active_media = self.has_active_outgoing_media();
let result = self
.pacer_control
.calculate(has_active_media, current_estimate, is_overuse);
self.pacer.set_padding_rate(result.padding_rate);
self.pacer.set_pacing_rate(result.pacing_rate);
}
fn has_active_outgoing_media(&self) -> bool {
self.medias
.iter()
.any(|m| m.direction().is_sending() && !m.disabled())
}
pub fn media_by_mid(&self, mid: Mid) -> Option<&Media> {
self.medias.iter().find(|m| m.mid() == mid)
}
pub fn media_by_mid_mut(&mut self, mid: Mid) -> Option<&mut Media> {
self.medias.iter_mut().find(|m| m.mid() == mid)
}
fn do_payload(&mut self) -> Result<(), RtcError> {
for m in &mut self.medias {
m.do_payload(
&mut self.streams,
&self.codec_config,
self.vp9_packetizer_mode,
)?;
}
Ok(())
}
pub fn set_direction(&mut self, mid: Mid, direction: Direction) -> bool {
let Some(media) = self.media_by_mid_mut(mid) else {
return false;
};
let old_dir = media.direction();
if old_dir == direction {
return false;
}
media.set_direction(direction);
if old_dir.is_sending() && !direction.is_sending() {
self.streams.reset_buffers_tx(mid);
}
let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
if old_dir.is_receiving() && !direction.is_receiving() {
self.streams.reset_buffers_rx(mid, max_seq_lookup);
}
true
}
pub fn is_request_keyframe_possible(&self, kind: KeyframeRequestKind) -> bool {
self.codec_config.iter().any(|r| match kind {
KeyframeRequestKind::Pli => r.fb_pli,
KeyframeRequestKind::Fir => r.fb_fir,
})
}
pub fn is_connected(&self) -> bool {
self.srtp_rx.is_some() && self.srtp_tx.is_some()
}
}
pub struct PacketReceipt {
pub header: RtpHeader,
pub seq_no: SeqNo,
pub is_padding: bool,
pub payload_size: usize,
}
fn main_payload_params(c: &CodecConfig, pt: Pt) -> Option<&PayloadParams> {
c.iter().find(|p| p.pt == pt || p.resend == Some(pt))
}
fn make_max_seq_lookup(map: &HashMap<Ssrc, SeqNo>) -> impl Fn(Ssrc) -> Option<SeqNo> + '_ {
|ssrc| map.get(&ssrc).cloned()
}
fn update_max_seq(map: &mut HashMap<Ssrc, SeqNo>, ssrc: Ssrc, seq_no: SeqNo) {
let current = map.entry(ssrc).or_insert(seq_no);
if seq_no > *current {
*current = seq_no;
}
}