interceptor/twcc/sender/
mod.rs1mod sender_stream;
2#[cfg(test)]
3mod sender_test;
4
5use std::sync::atomic::Ordering;
6use std::sync::Arc;
7
8use portable_atomic::AtomicU32;
9use rtp::extension::transport_cc_extension::TransportCcExtension;
10use sender_stream::SenderStream;
11use tokio::sync::Mutex;
12use util::Marshal;
13
14use crate::{Attributes, RTPWriter, *};
15
16pub(crate) const TRANSPORT_CC_URI: &str =
17 "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
18
19#[derive(Default)]
21pub struct SenderBuilder {
22 init_sequence_nr: u32,
23}
24
25impl SenderBuilder {
26 pub fn with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder {
28 self.init_sequence_nr = init_sequence_nr;
29 self
30 }
31}
32
33impl InterceptorBuilder for SenderBuilder {
34 fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
36 Ok(Arc::new(Sender {
37 next_sequence_nr: Arc::new(AtomicU32::new(self.init_sequence_nr)),
38 streams: Mutex::new(HashMap::new()),
39 }))
40 }
41}
42
43pub struct Sender {
45 next_sequence_nr: Arc<AtomicU32>,
46 streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
47}
48
49impl Sender {
50 pub fn builder() -> SenderBuilder {
52 SenderBuilder::default()
53 }
54}
55
56#[async_trait]
57impl Interceptor for Sender {
58 async fn bind_rtcp_reader(
61 &self,
62 reader: Arc<dyn RTCPReader + Send + Sync>,
63 ) -> Arc<dyn RTCPReader + Send + Sync> {
64 reader
65 }
66
67 async fn bind_rtcp_writer(
70 &self,
71 writer: Arc<dyn RTCPWriter + Send + Sync>,
72 ) -> Arc<dyn RTCPWriter + Send + Sync> {
73 writer
74 }
75
76 async fn bind_local_stream(
79 &self,
80 info: &StreamInfo,
81 writer: Arc<dyn RTPWriter + Send + Sync>,
82 ) -> Arc<dyn RTPWriter + Send + Sync> {
83 let mut hdr_ext_id = 0u8;
84 for e in &info.rtp_header_extensions {
85 if e.uri == TRANSPORT_CC_URI {
86 hdr_ext_id = e.id as u8;
87 break;
88 }
89 }
90 if hdr_ext_id == 0 {
91 return writer;
93 }
94
95 let stream = Arc::new(SenderStream::new(
96 writer,
97 Arc::clone(&self.next_sequence_nr),
98 hdr_ext_id,
99 ));
100
101 {
102 let mut streams = self.streams.lock().await;
103 streams.insert(info.ssrc, Arc::clone(&stream));
104 }
105
106 stream
107 }
108
109 async fn unbind_local_stream(&self, info: &StreamInfo) {
111 let mut streams = self.streams.lock().await;
112 streams.remove(&info.ssrc);
113 }
114
115 async fn bind_remote_stream(
118 &self,
119 _info: &StreamInfo,
120 reader: Arc<dyn RTPReader + Send + Sync>,
121 ) -> Arc<dyn RTPReader + Send + Sync> {
122 reader
123 }
124
125 async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
127
128 async fn close(&self) -> Result<()> {
130 Ok(())
131 }
132}