interceptor/twcc/sender/
mod.rs

1mod sender_stream;
2#[cfg(test)]
3mod sender_test;
4
5use std::sync::atomic::Ordering;
6use std::sync::Arc;
7
8use portable_atomic::AtomicU32;
9use rtp::extension::transport_cc_extension::TransportCcExtension;
10use sender_stream::SenderStream;
11use tokio::sync::Mutex;
12use util::Marshal;
13
14use crate::{Attributes, RTPWriter, *};
15
16pub(crate) const TRANSPORT_CC_URI: &str =
17    "http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01";
18
19/// HeaderExtensionBuilder is a InterceptorBuilder for a HeaderExtension Interceptor
20#[derive(Default)]
21pub struct SenderBuilder {
22    init_sequence_nr: u32,
23}
24
25impl SenderBuilder {
26    /// with_init_sequence_nr sets the init sequence number of the interceptor.
27    pub fn with_init_sequence_nr(mut self, init_sequence_nr: u32) -> SenderBuilder {
28        self.init_sequence_nr = init_sequence_nr;
29        self
30    }
31}
32
33impl InterceptorBuilder for SenderBuilder {
34    /// build constructs a new SenderInterceptor
35    fn build(&self, _id: &str) -> Result<Arc<dyn Interceptor + Send + Sync>> {
36        Ok(Arc::new(Sender {
37            next_sequence_nr: Arc::new(AtomicU32::new(self.init_sequence_nr)),
38            streams: Mutex::new(HashMap::new()),
39        }))
40    }
41}
42
43/// Sender adds transport wide sequence numbers as header extension to each RTP packet
44pub struct Sender {
45    next_sequence_nr: Arc<AtomicU32>,
46    streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
47}
48
49impl Sender {
50    /// builder returns a new SenderBuilder.
51    pub fn builder() -> SenderBuilder {
52        SenderBuilder::default()
53    }
54}
55
56#[async_trait]
57impl Interceptor for Sender {
58    /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
59    /// change in the future. The returned method will be called once per packet batch.
60    async fn bind_rtcp_reader(
61        &self,
62        reader: Arc<dyn RTCPReader + Send + Sync>,
63    ) -> Arc<dyn RTCPReader + Send + Sync> {
64        reader
65    }
66
67    /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
68    /// will be called once per packet batch.
69    async fn bind_rtcp_writer(
70        &self,
71        writer: Arc<dyn RTCPWriter + Send + Sync>,
72    ) -> Arc<dyn RTCPWriter + Send + Sync> {
73        writer
74    }
75
76    /// bind_local_stream returns a writer that adds a rtp TransportCCExtension
77    /// header with increasing sequence numbers to each outgoing packet.
78    async fn bind_local_stream(
79        &self,
80        info: &StreamInfo,
81        writer: Arc<dyn RTPWriter + Send + Sync>,
82    ) -> Arc<dyn RTPWriter + Send + Sync> {
83        let mut hdr_ext_id = 0u8;
84        for e in &info.rtp_header_extensions {
85            if e.uri == TRANSPORT_CC_URI {
86                hdr_ext_id = e.id as u8;
87                break;
88            }
89        }
90        if hdr_ext_id == 0 {
91            // Don't add header extension if ID is 0, because 0 is an invalid extension ID
92            return writer;
93        }
94
95        let stream = Arc::new(SenderStream::new(
96            writer,
97            Arc::clone(&self.next_sequence_nr),
98            hdr_ext_id,
99        ));
100
101        {
102            let mut streams = self.streams.lock().await;
103            streams.insert(info.ssrc, Arc::clone(&stream));
104        }
105
106        stream
107    }
108
109    /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
110    async fn unbind_local_stream(&self, info: &StreamInfo) {
111        let mut streams = self.streams.lock().await;
112        streams.remove(&info.ssrc);
113    }
114
115    /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
116    /// will be called once per rtp packet.
117    async fn bind_remote_stream(
118        &self,
119        _info: &StreamInfo,
120        reader: Arc<dyn RTPReader + Send + Sync>,
121    ) -> Arc<dyn RTPReader + Send + Sync> {
122        reader
123    }
124
125    /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
126    async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
127
128    /// close closes the Interceptor, cleaning up any data if necessary.
129    async fn close(&self) -> Result<()> {
130        Ok(())
131    }
132}