interceptor/report/sender/
mod.rs

1mod sender_stream;
2#[cfg(test)]
3mod sender_test;
4
5use std::collections::HashMap;
6use std::time::{Duration, SystemTime};
7
8use sender_stream::SenderStream;
9use tokio::sync::{mpsc, Mutex};
10use waitgroup::WaitGroup;
11
12use super::*;
13use crate::error::Error;
14use crate::*;
15
16pub(crate) struct SenderReportInternal {
17    pub(crate) interval: Duration,
18    pub(crate) now: Option<FnTimeGen>,
19    pub(crate) streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
20    pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
21}
22
23/// SenderReport interceptor generates sender reports.
24pub struct SenderReport {
25    pub(crate) internal: Arc<SenderReportInternal>,
26
27    pub(crate) wg: Mutex<Option<WaitGroup>>,
28    pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
29}
30
31impl SenderReport {
32    /// builder returns a new ReportBuilder.
33    pub fn builder() -> ReportBuilder {
34        ReportBuilder {
35            is_rr: false,
36            ..Default::default()
37        }
38    }
39
40    async fn is_closed(&self) -> bool {
41        let close_tx = self.close_tx.lock().await;
42        close_tx.is_none()
43    }
44
45    async fn run(
46        rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
47        internal: Arc<SenderReportInternal>,
48    ) -> Result<()> {
49        let mut ticker = tokio::time::interval(internal.interval);
50        let mut close_rx = {
51            let mut close_rx = internal.close_rx.lock().await;
52            if let Some(close) = close_rx.take() {
53                close
54            } else {
55                return Err(Error::ErrInvalidCloseRx);
56            }
57        };
58
59        loop {
60            tokio::select! {
61                _ = ticker.tick() =>{
62                    // TODO(cancel safety): This branch isn't cancel safe
63                    let now = if let Some(f) = &internal.now {
64                        f()
65                    } else {
66                        SystemTime::now()
67                    };
68                    let streams:Vec<Arc<SenderStream>> = {
69                        let m = internal.streams.lock().await;
70                        m.values().cloned().collect()
71                    };
72                    for stream in streams {
73                        let pkt = stream.generate_report(now).await;
74
75                        let a = Attributes::new();
76                        if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
77                            log::warn!("failed sending: {err}");
78                        }
79                    }
80                }
81                _ = close_rx.recv() =>{
82                    return Ok(());
83                }
84            }
85        }
86    }
87}
88
89#[async_trait]
90impl Interceptor for SenderReport {
91    /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
92    /// change in the future. The returned method will be called once per packet batch.
93    async fn bind_rtcp_reader(
94        &self,
95        reader: Arc<dyn RTCPReader + Send + Sync>,
96    ) -> Arc<dyn RTCPReader + Send + Sync> {
97        reader
98    }
99
100    /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
101    /// will be called once per packet batch.
102    async fn bind_rtcp_writer(
103        &self,
104        writer: Arc<dyn RTCPWriter + Send + Sync>,
105    ) -> Arc<dyn RTCPWriter + Send + Sync> {
106        if self.is_closed().await {
107            return writer;
108        }
109
110        let mut w = {
111            let wait_group = self.wg.lock().await;
112            wait_group.as_ref().map(|wg| wg.worker())
113        };
114        let writer2 = Arc::clone(&writer);
115        let internal = Arc::clone(&self.internal);
116        tokio::spawn(async move {
117            let _d = w.take();
118            if let Err(err) = SenderReport::run(writer2, internal).await {
119                log::warn!("bind_rtcp_writer Generator::run got error: {err}");
120            }
121        });
122
123        writer
124    }
125
126    /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
127    /// will be called once per rtp packet.
128    async fn bind_local_stream(
129        &self,
130        info: &StreamInfo,
131        writer: Arc<dyn RTPWriter + Send + Sync>,
132    ) -> Arc<dyn RTPWriter + Send + Sync> {
133        let stream = Arc::new(SenderStream::new(
134            info.ssrc,
135            info.clock_rate,
136            writer,
137            self.internal.now.clone(),
138        ));
139        {
140            let mut streams = self.internal.streams.lock().await;
141            streams.insert(info.ssrc, Arc::clone(&stream));
142        }
143
144        stream
145    }
146
147    /// unbind_local_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
148    async fn unbind_local_stream(&self, info: &StreamInfo) {
149        let mut streams = self.internal.streams.lock().await;
150        streams.remove(&info.ssrc);
151    }
152
153    /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
154    /// will be called once per rtp packet.
155    async fn bind_remote_stream(
156        &self,
157        _info: &StreamInfo,
158        reader: Arc<dyn RTPReader + Send + Sync>,
159    ) -> Arc<dyn RTPReader + Send + Sync> {
160        reader
161    }
162
163    /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
164    async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
165
166    /// close closes the Interceptor, cleaning up any data if necessary.
167    async fn close(&self) -> Result<()> {
168        {
169            let mut close_tx = self.close_tx.lock().await;
170            close_tx.take();
171        }
172
173        {
174            let mut wait_group = self.wg.lock().await;
175            if let Some(wg) = wait_group.take() {
176                wg.wait().await;
177            }
178        }
179
180        Ok(())
181    }
182}