interceptor/report/receiver/
mod.rs

1mod receiver_stream;
2#[cfg(test)]
3mod receiver_test;
4
5use std::collections::HashMap;
6use std::time::{Duration, SystemTime};
7
8use receiver_stream::ReceiverStream;
9use tokio::sync::{mpsc, Mutex};
10use waitgroup::WaitGroup;
11
12use super::*;
13use crate::error::Error;
14use crate::*;
15
16pub(crate) struct ReceiverReportInternal {
17    pub(crate) interval: Duration,
18    pub(crate) now: Option<FnTimeGen>,
19    pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
20    pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
21}
22
23pub(crate) struct ReceiverReportRtcpReader {
24    pub(crate) internal: Arc<ReceiverReportInternal>,
25    pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
26}
27
28#[async_trait]
29impl RTCPReader for ReceiverReportRtcpReader {
30    async fn read(
31        &self,
32        buf: &mut [u8],
33        a: &Attributes,
34    ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
35        let (pkts, attr) = self.parent_rtcp_reader.read(buf, a).await?;
36
37        let now = if let Some(f) = &self.internal.now {
38            f()
39        } else {
40            SystemTime::now()
41        };
42
43        for p in &pkts {
44            if let Some(sr) = p
45                .as_any()
46                .downcast_ref::<rtcp::sender_report::SenderReport>()
47            {
48                let stream = {
49                    let m = self.internal.streams.lock().await;
50                    m.get(&sr.ssrc).cloned()
51                };
52                if let Some(stream) = stream {
53                    stream.process_sender_report(now, sr);
54                }
55            }
56        }
57
58        Ok((pkts, attr))
59    }
60}
61
62/// ReceiverReport interceptor generates receiver reports.
63pub struct ReceiverReport {
64    pub(crate) internal: Arc<ReceiverReportInternal>,
65
66    pub(crate) wg: Mutex<Option<WaitGroup>>,
67    pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
68}
69
70impl ReceiverReport {
71    /// builder returns a new ReportBuilder.
72    pub fn builder() -> ReportBuilder {
73        ReportBuilder {
74            is_rr: true,
75            ..Default::default()
76        }
77    }
78
79    async fn is_closed(&self) -> bool {
80        let close_tx = self.close_tx.lock().await;
81        close_tx.is_none()
82    }
83
84    async fn run(
85        rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
86        internal: Arc<ReceiverReportInternal>,
87    ) -> Result<()> {
88        let mut ticker = tokio::time::interval(internal.interval);
89        let mut close_rx = {
90            let mut close_rx = internal.close_rx.lock().await;
91            if let Some(close) = close_rx.take() {
92                close
93            } else {
94                return Err(Error::ErrInvalidCloseRx);
95            }
96        };
97
98        loop {
99            tokio::select! {
100                _ = ticker.tick() =>{
101                    // TODO(cancel safety): This branch isn't cancel safe
102
103                    let now = if let Some(f) = &internal.now {
104                        f()
105                    } else {
106                        SystemTime::now()
107                    };
108                    let streams:Vec<Arc<ReceiverStream>> = {
109                        let m = internal.streams.lock().await;
110                        m.values().cloned().collect()
111                    };
112                    for stream in streams {
113                        let pkt = stream.generate_report(now);
114
115                        let a = Attributes::new();
116                        if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
117                            log::warn!("failed sending: {err}");
118                        }
119                    }
120                }
121                _ = close_rx.recv() =>{
122                    return Ok(());
123                }
124            }
125        }
126    }
127}
128
129#[async_trait]
130impl Interceptor for ReceiverReport {
131    /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
132    /// change in the future. The returned method will be called once per packet batch.
133    async fn bind_rtcp_reader(
134        &self,
135        reader: Arc<dyn RTCPReader + Send + Sync>,
136    ) -> Arc<dyn RTCPReader + Send + Sync> {
137        Arc::new(ReceiverReportRtcpReader {
138            internal: Arc::clone(&self.internal),
139            parent_rtcp_reader: reader,
140        })
141    }
142
143    /// bind_rtcp_writer lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
144    /// will be called once per packet batch.
145    async fn bind_rtcp_writer(
146        &self,
147        writer: Arc<dyn RTCPWriter + Send + Sync>,
148    ) -> Arc<dyn RTCPWriter + Send + Sync> {
149        if self.is_closed().await {
150            return writer;
151        }
152
153        let mut w = {
154            let wait_group = self.wg.lock().await;
155            wait_group.as_ref().map(|wg| wg.worker())
156        };
157        let writer2 = Arc::clone(&writer);
158        let internal = Arc::clone(&self.internal);
159        tokio::spawn(async move {
160            let _d = w.take();
161            if let Err(err) = ReceiverReport::run(writer2, internal).await {
162                log::warn!("bind_rtcp_writer ReceiverReport::run got error: {err}");
163            }
164        });
165
166        writer
167    }
168
169    /// bind_local_stream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
170    /// will be called once per rtp packet.
171    async fn bind_local_stream(
172        &self,
173        _info: &StreamInfo,
174        writer: Arc<dyn RTPWriter + Send + Sync>,
175    ) -> Arc<dyn RTPWriter + Send + Sync> {
176        writer
177    }
178
179    /// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
180    async fn unbind_local_stream(&self, _info: &StreamInfo) {}
181
182    /// bind_remote_stream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
183    /// will be called once per rtp packet.
184    async fn bind_remote_stream(
185        &self,
186        info: &StreamInfo,
187        reader: Arc<dyn RTPReader + Send + Sync>,
188    ) -> Arc<dyn RTPReader + Send + Sync> {
189        let stream = Arc::new(ReceiverStream::new(
190            info.ssrc,
191            info.clock_rate,
192            reader,
193            self.internal.now.clone(),
194        ));
195        {
196            let mut streams = self.internal.streams.lock().await;
197            streams.insert(info.ssrc, Arc::clone(&stream));
198        }
199
200        stream
201    }
202
203    /// unbind_remote_stream is called when the Stream is removed. It can be used to clean up any data related to that track.
204    async fn unbind_remote_stream(&self, info: &StreamInfo) {
205        let mut streams = self.internal.streams.lock().await;
206        streams.remove(&info.ssrc);
207    }
208
209    /// close closes the Interceptor, cleaning up any data if necessary.
210    async fn close(&self) -> Result<()> {
211        {
212            let mut close_tx = self.close_tx.lock().await;
213            close_tx.take();
214        }
215
216        {
217            let mut wait_group = self.wg.lock().await;
218            if let Some(wg) = wait_group.take() {
219                wg.wait().await;
220            }
221        }
222
223        Ok(())
224    }
225}