use std::collections::{HashMap, VecDeque};
use std::time::Instant;
use crate::RtcError;
use crate::change::AddMedia;
use crate::format::CodecConfig;
use crate::io::DATAGRAM_MTU;
use crate::packet::{CodecDepacketizer, DepacketizingBuffer, Payloader, RtpMeta};
use crate::rtp_::ExtensionMap;
use crate::rtp_::MidRid;
use crate::rtp_::SRTP_BLOCK_SIZE;
use crate::rtp_::SRTP_OVERHEAD;
use str0m_proto::Id;
use crate::format::PayloadParams;
use crate::format::Vp9PacketizerMode;
use crate::sdp::Simulcast as SdpSimulcast;
use crate::sdp::{MediaLine, Msid};
use crate::streams::{RtpPacket, Streams};
use crate::util::already_happened;
mod event;
pub use event::*;
mod writer;
pub use writer::Writer;
pub use crate::packet::MediaKind;
pub use crate::rtp_::{Direction, ExtensionValues, Frequency, MediaTime, Mid, Pt, Rid};
pub(crate) const MID_PROBE: Mid = Mid::from_array(*b"~]probe\0\0\0\0\0\0\0\0\0");
#[derive(Debug)]
pub struct Media {
mid: Mid,
cname: String,
rids_rx: Rids,
rids_tx: Rids,
index: usize,
msid: Msid,
kind: MediaKind,
dir: Direction,
remote_pts: Vec<Pt>,
remote_exts: ExtensionMap,
remote_created: bool,
simulcast: Option<SdpSimulcast>,
depayloaders: HashMap<(Pt, Option<Rid>), DepacketizingBuffer>,
payloaders: HashMap<(Pt, Option<Rid>), Payloader>,
to_payload: VecDeque<ToPayload>,
pub(crate) need_open_event: bool,
pub(crate) need_changed_event: bool,
pub(crate) app_tmp: bool,
}
#[derive(Debug)]
pub enum Rids {
None,
Any,
Specific(Vec<Rid>),
}
impl Rids {
pub(crate) fn contains(&self, rid: Rid) -> bool {
match self {
Rids::None => false,
Rids::Any => true,
Rids::Specific(v) => v.contains(&rid),
}
}
pub(crate) fn is_specific(&self) -> bool {
matches!(self, Rids::Specific(_))
}
fn add(&mut self, rid: Rid) {
match self {
Rids::None | Rids::Any => {
*self = Rids::Specific(vec![rid]);
}
Rids::Specific(vec) if !vec.contains(&rid) => vec.push(rid),
Rids::Specific(_) => {}
}
}
}
#[derive(Debug)]
pub(crate) struct ToPayload {
pub pt: Pt,
pub rid: Option<Rid>,
pub wallclock: Instant,
pub rtp_time: MediaTime,
pub start_of_talk_spurt: bool,
pub data: Vec<u8>,
pub ext_vals: ExtensionValues,
}
impl Media {
pub fn mid(&self) -> Mid {
self.mid
}
pub fn cname(&self) -> &str {
&self.cname
}
pub fn expect_rid_rx(&mut self, rid: Rid) {
self.rids_rx.add(rid);
}
pub fn rids_rx(&self) -> &Rids {
&self.rids_rx
}
pub fn rids_tx(&self) -> &Rids {
&self.rids_tx
}
pub(crate) fn index(&self) -> usize {
self.index
}
pub(crate) fn msid(&self) -> &Msid {
&self.msid
}
pub fn stream_id(&self) -> &str {
&self.msid().stream_id
}
pub fn track_id(&self) -> &str {
&self.msid().track_id
}
pub fn kind(&self) -> MediaKind {
self.kind
}
pub fn direction(&self) -> Direction {
self.dir
}
pub fn disabled(&self) -> bool {
self.remote_pts.is_empty()
}
pub(crate) fn mark_disabled(&mut self) {
self.remote_pts.clear();
}
pub(crate) fn simulcast(&self) -> Option<&SdpSimulcast> {
self.simulcast.as_ref()
}
pub(crate) fn poll_sample(
&mut self,
params: &[PayloadParams],
) -> Result<Option<MediaData>, RtcError> {
for ((pt, rid), buf) in &mut self.depayloaders {
if let Some(r) = buf.pop() {
let dep = r.map_err(|e| RtcError::Packet(self.mid, *pt, e))?;
let Some(codec) = params.iter().find(|c| c.pt() == *pt) else {
return Ok(None);
};
return Ok(Some(MediaData {
mid: self.mid,
pt: *pt,
rid: *rid,
params: *codec,
time: dep.time,
network_time: dep.first_network_time(),
seq_range: dep.seq_range(),
contiguous: dep.contiguous,
ext_vals: dep.ext_vals().clone(),
codec_extra: dep.codec_extra,
last_sender_info: dep.first_sender_info(),
audio_start_of_talk_spurt: codec.spec().codec.is_audio()
&& dep.start_of_talkspurt(),
data: dep.data,
}));
}
}
Ok(None)
}
pub(crate) fn depayload(
&mut self,
rid: Option<Rid>,
packet: RtpPacket,
reordering_size_audio: usize,
reordering_size_video: usize,
params: &[PayloadParams],
) {
if !self.dir.is_receiving() {
return;
}
let pt = packet.header.payload_type;
let key = (pt, rid);
let exists = self.depayloaders.contains_key(&key);
if !exists {
let params = params.iter().find(|p| p.pt == pt).unwrap();
let codec = params.spec.codec;
let hold_back = if codec.is_audio() {
reordering_size_audio
} else {
reordering_size_video
};
let mut depack: CodecDepacketizer = codec.into();
if let CodecDepacketizer::H265(ref mut h265) = depack {
if params.spec.format.sprop_max_don_diff.unwrap_or(0) > 0 {
h265.with_donl(true);
}
}
let buffer = DepacketizingBuffer::new(depack, hold_back);
self.depayloaders.insert((pt, rid), buffer);
}
let buffer = self.depayloaders.get_mut(&key).unwrap();
let meta = RtpMeta {
received: packet.timestamp,
time: packet.time,
seq_no: packet.seq_no,
header: packet.header.clone(),
last_sender_info: packet.last_sender_info,
};
buffer.push(meta, packet.payload);
}
pub(crate) fn set_cname(&mut self, cname: String) {
self.cname = cname;
}
pub(crate) fn set_msid(&mut self, msid: Msid) {
self.msid = msid;
}
pub(crate) fn set_direction(&mut self, new_dir: Direction) {
self.need_changed_event = self.dir != new_dir;
self.dir = new_dir;
}
pub(crate) fn set_simulcast(&mut self, s: SdpSimulcast) {
debug!("Set simulcast: {:?}", s);
self.simulcast = Some(s);
}
fn payloader_for(
&mut self,
pt: Pt,
rid: Option<Rid>,
params: &[PayloadParams],
vp9_mode: Vp9PacketizerMode,
) -> &mut Payloader {
self.payloaders.entry((pt, rid)).or_insert_with(|| {
let params = params.iter().find(|p| p.pt == pt).unwrap();
Payloader::new(params.spec, vp9_mode)
})
}
fn set_to_payload(&mut self, to_payload: ToPayload) -> Result<(), RtcError> {
if self.to_payload.len() > 100 {
return Err(RtcError::WriteWithoutPoll);
}
self.to_payload.push_back(to_payload);
Ok(())
}
pub(crate) fn poll_timeout(&self) -> Option<Instant> {
if !self.to_payload.is_empty() {
Some(already_happened())
} else {
None
}
}
pub(crate) fn do_payload(
&mut self,
streams: &mut Streams,
params: &[PayloadParams],
vp9_mode: Vp9PacketizerMode,
) -> Result<(), RtcError> {
let Some(to_payload) = self.to_payload.pop_front() else {
return Ok(());
};
let ToPayload { pt, rid, .. } = &to_payload;
let is_audio = self.kind.is_audio();
let midrid = MidRid(self.mid, *rid);
let stream = streams.stream_tx_by_midrid(midrid);
let Some(stream) = stream else {
return Err(RtcError::NoSenderSource);
};
let pt = *pt;
let payloader = self.payloader_for(pt, *rid, params, vp9_mode);
const RTP_SIZE: usize = DATAGRAM_MTU - SRTP_OVERHEAD;
const MTU: usize = RTP_SIZE - RTP_SIZE % SRTP_BLOCK_SIZE;
payloader
.push_sample(to_payload, MTU, is_audio, stream)
.map_err(|e| RtcError::Packet(self.mid, pt, e))?;
Ok(())
}
pub(crate) fn set_remote_pts(&mut self, pts: Vec<Pt>) {
if !self.remote_pts.is_empty() {
return;
}
debug!("Mid ({}) remote PT order is: {:?}", self.mid, pts);
self.remote_pts = pts;
}
pub(crate) fn set_remote_extmap(&mut self, exts: ExtensionMap) {
self.remote_exts = exts;
}
pub fn remote_pts(&self) -> &[Pt] {
&self.remote_pts
}
pub fn remote_extmap(&self) -> &ExtensionMap {
&self.remote_exts
}
pub(crate) fn remote_created(&self) -> bool {
self.remote_created
}
pub(crate) fn first_pt_with_rtx(&self, config: &CodecConfig) -> Option<Pt> {
config
.all_for_kind(self.kind)
.filter(|p| self.remote_pts.contains(&p.pt))
.find_map(|p| p.resend().map(|_| p.pt))
}
pub(crate) fn reset_depayloader(&mut self, payload_type: Pt, rid: Option<Rid>) {
self.depayloaders.remove(&(payload_type, rid));
}
pub(crate) fn set_rid_rx(&mut self, rids: Rids) {
self.rids_rx = rids;
}
pub(crate) fn set_rid_tx(&mut self, rids: Rids) {
self.rids_tx = rids;
}
pub(crate) fn add_to_rid_tx(&mut self, rid: Rid) {
self.rids_tx.add(rid)
}
}
impl Default for Media {
fn default() -> Self {
Self {
mid: Mid::new(),
index: 0,
app_tmp: false,
cname: Id::<20>::random().to_string(),
msid: Msid::random(),
kind: MediaKind::Video,
remote_pts: vec![],
remote_exts: ExtensionMap::empty(),
remote_created: false,
dir: Direction::SendRecv,
simulcast: None,
rids_rx: Rids::Any,
rids_tx: Rids::None,
payloaders: HashMap::new(),
depayloaders: HashMap::new(),
to_payload: VecDeque::default(),
need_open_event: true,
need_changed_event: false,
}
}
}
impl Media {
pub(crate) fn from_remote_media_line(
l: &MediaLine,
index: usize,
remote_created: bool,
) -> Self {
Media {
mid: l.mid(),
index,
msid: l.msid().unwrap_or(Msid::random()),
kind: l.typ.clone().into(),
dir: if l.disabled {
Direction::Inactive
} else {
l.direction().invert() },
remote_created,
..Default::default()
}
}
pub(crate) fn from_add_media(a: AddMedia) -> Self {
Media {
mid: a.mid,
index: a.index,
cname: a.cname,
msid: a.msid,
kind: a.kind,
dir: a.dir,
remote_pts: a.pts,
remote_exts: a.exts,
remote_created: false,
simulcast: a.simulcast.map(|s| s.into_sdp()),
..Default::default()
}
}
pub(crate) fn from_app_tmp(mid: Mid, index: usize) -> Media {
Media {
mid,
index,
app_tmp: true,
..Default::default()
}
}
pub(crate) fn from_direct_api(
mid: Mid,
index: usize,
kind: MediaKind,
exts: ExtensionMap,
) -> Media {
Media {
mid,
index,
kind,
dir: Direction::SendRecv,
remote_exts: exts,
..Default::default()
}
}
}