mod sender_stream;
#[cfg(test)]
mod sender_test;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use portable_atomic::AtomicU32;
use rtp::extension::transport_cc_extension::TransportCcExtension;
use sender_stream::SenderStream;
use tokio::sync::Mutex;
use util::Marshal;
use crate::{Attributes, RTPWriter, *};
pub(crate) const TRANSPORT_CC_URI: &str =
"http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
#[derive(Default)]
pub struct SenderBuilder {
init_sequence_nr: u32,
}
impl SenderBuilder {
pub fn with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder {
self.init_sequence_nr = init_sequence_nr;
self
}
}
impl InterceptorBuilder for SenderBuilder {
fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
Ok(Arc::new(Sender {
next_sequence_nr: Arc::new(AtomicU32::new(self.init_sequence_nr)),
streams: Mutex::new(HashMap::new()),
}))
}
}
pub struct Sender {
next_sequence_nr: Arc<AtomicU32>,
streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
}
impl Sender {
pub fn builder() -> SenderBuilder {
SenderBuilder::default()
}
}
#[async_trait]
impl Interceptor for Sender {
async fn bind_rtcp_reader(
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
reader
}
async fn bind_rtcp_writer(
&self,
writer: Arc<dyn RTCPWriter + Send + Sync>,
) -> Arc<dyn RTCPWriter + Send + Sync> {
writer
}
async fn bind_local_stream(
&self,
info: &StreamInfo,
writer: Arc<dyn RTPWriter + Send + Sync>,
) -> Arc<dyn RTPWriter + Send + Sync> {
let mut hdr_ext_id = 0u8;
for e in &info.rtp_header_extensions {
if e.uri == TRANSPORT_CC_URI {
hdr_ext_id = e.id as u8;
break;
}
}
if hdr_ext_id == 0 {
return writer;
}
let stream = Arc::new(SenderStream::new(
writer,
Arc::clone(&self.next_sequence_nr),
hdr_ext_id,
));
{
let mut streams = self.streams.lock().await;
streams.insert(info.ssrc, Arc::clone(&stream));
}
stream
}
async fn unbind_local_stream(&self, info: &StreamInfo) {
let mut streams = self.streams.lock().await;
streams.remove(&info.ssrc);
}
async fn bind_remote_stream(
&self,
_info: &StreamInfo,
reader: Arc<dyn RTPReader + Send + Sync>,
) -> Arc<dyn RTPReader + Send + Sync> {
reader
}
async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
async fn close(&self) -> Result<()> {
Ok(())
}
}