interceptor/report/receiver/
mod.rs1mod receiver_stream;
2#[cfg(test)]
3mod receiver_test;
4
5use std::collections::HashMap;
6use std::time::{Duration, SystemTime};
7
8use receiver_stream::ReceiverStream;
9use tokio::sync::{mpsc, Mutex};
10use waitgroup::WaitGroup;
11
12use super::*;
13use crate::error::Error;
14use crate::*;
15
16pub(crate) struct ReceiverReportInternal {
17 pub(crate) interval: Duration,
18 pub(crate) now: Option<FnTimeGen>,
19 pub(crate) streams: Mutex<HashMap<u32, Arc<ReceiverStream>>>,
20 pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
21}
22
23pub(crate) struct ReceiverReportRtcpReader {
24 pub(crate) internal: Arc<ReceiverReportInternal>,
25 pub(crate) parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
26}
27
28#[async_trait]
29impl RTCPReader for ReceiverReportRtcpReader {
30 async fn read(
31 &self,
32 buf: &mut [u8],
33 a: &Attributes,
34 ) -> Result<(Vec<Box<dyn rtcp::packet::Packet + Send + Sync>>, Attributes)> {
35 let (pkts, attr) = self.parent_rtcp_reader.read(buf, a).await?;
36
37 let now = if let Some(f) = &self.internal.now {
38 f()
39 } else {
40 SystemTime::now()
41 };
42
43 for p in &pkts {
44 if let Some(sr) = p
45 .as_any()
46 .downcast_ref::<rtcp::sender_report::SenderReport>()
47 {
48 let stream = {
49 let m = self.internal.streams.lock().await;
50 m.get(&sr.ssrc).cloned()
51 };
52 if let Some(stream) = stream {
53 stream.process_sender_report(now, sr);
54 }
55 }
56 }
57
58 Ok((pkts, attr))
59 }
60}
61
62pub struct ReceiverReport {
64 pub(crate) internal: Arc<ReceiverReportInternal>,
65
66 pub(crate) wg: Mutex<Option<WaitGroup>>,
67 pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
68}
69
70impl ReceiverReport {
71 pub fn builder() -> ReportBuilder {
73 ReportBuilder {
74 is_rr: true,
75 ..Default::default()
76 }
77 }
78
79 async fn is_closed(&self) -> bool {
80 let close_tx = self.close_tx.lock().await;
81 close_tx.is_none()
82 }
83
84 async fn run(
85 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
86 internal: Arc<ReceiverReportInternal>,
87 ) -> Result<()> {
88 let mut ticker = tokio::time::interval(internal.interval);
89 let mut close_rx = {
90 let mut close_rx = internal.close_rx.lock().await;
91 if let Some(close) = close_rx.take() {
92 close
93 } else {
94 return Err(Error::ErrInvalidCloseRx);
95 }
96 };
97
98 loop {
99 tokio::select! {
100 _ = ticker.tick() =>{
101 let now = if let Some(f) = &internal.now {
104 f()
105 } else {
106 SystemTime::now()
107 };
108 let streams:Vec<Arc<ReceiverStream>> = {
109 let m = internal.streams.lock().await;
110 m.values().cloned().collect()
111 };
112 for stream in streams {
113 let pkt = stream.generate_report(now);
114
115 let a = Attributes::new();
116 if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
117 log::warn!("failed sending: {err}");
118 }
119 }
120 }
121 _ = close_rx.recv() =>{
122 return Ok(());
123 }
124 }
125 }
126 }
127}
128
129#[async_trait]
130impl Interceptor for ReceiverReport {
131 async fn bind_rtcp_reader(
134 &self,
135 reader: Arc<dyn RTCPReader + Send + Sync>,
136 ) -> Arc<dyn RTCPReader + Send + Sync> {
137 Arc::new(ReceiverReportRtcpReader {
138 internal: Arc::clone(&self.internal),
139 parent_rtcp_reader: reader,
140 })
141 }
142
143 async fn bind_rtcp_writer(
146 &self,
147 writer: Arc<dyn RTCPWriter + Send + Sync>,
148 ) -> Arc<dyn RTCPWriter + Send + Sync> {
149 if self.is_closed().await {
150 return writer;
151 }
152
153 let mut w = {
154 let wait_group = self.wg.lock().await;
155 wait_group.as_ref().map(|wg| wg.worker())
156 };
157 let writer2 = Arc::clone(&writer);
158 let internal = Arc::clone(&self.internal);
159 tokio::spawn(async move {
160 let _d = w.take();
161 if let Err(err) = ReceiverReport::run(writer2, internal).await {
162 log::warn!("bind_rtcp_writer ReceiverReport::run got error: {err}");
163 }
164 });
165
166 writer
167 }
168
169 async fn bind_local_stream(
172 &self,
173 _info: &StreamInfo,
174 writer: Arc<dyn RTPWriter + Send + Sync>,
175 ) -> Arc<dyn RTPWriter + Send + Sync> {
176 writer
177 }
178
179 async fn unbind_local_stream(&self, _info: &StreamInfo) {}
181
182 async fn bind_remote_stream(
185 &self,
186 info: &StreamInfo,
187 reader: Arc<dyn RTPReader + Send + Sync>,
188 ) -> Arc<dyn RTPReader + Send + Sync> {
189 let stream = Arc::new(ReceiverStream::new(
190 info.ssrc,
191 info.clock_rate,
192 reader,
193 self.internal.now.clone(),
194 ));
195 {
196 let mut streams = self.internal.streams.lock().await;
197 streams.insert(info.ssrc, Arc::clone(&stream));
198 }
199
200 stream
201 }
202
203 async fn unbind_remote_stream(&self, info: &StreamInfo) {
205 let mut streams = self.internal.streams.lock().await;
206 streams.remove(&info.ssrc);
207 }
208
209 async fn close(&self) -> Result<()> {
211 {
212 let mut close_tx = self.close_tx.lock().await;
213 close_tx.take();
214 }
215
216 {
217 let mut wait_group = self.wg.lock().await;
218 if let Some(wg) = wait_group.take() {
219 wg.wait().await;
220 }
221 }
222
223 Ok(())
224 }
225}