interceptor/report/sender/
mod.rs1mod sender_stream;
2#[cfg(test)]
3mod sender_test;
4
5use std::collections::HashMap;
6use std::time::{Duration, SystemTime};
7
8use sender_stream::SenderStream;
9use tokio::sync::{mpsc, Mutex};
10use waitgroup::WaitGroup;
11
12use super::*;
13use crate::error::Error;
14use crate::*;
15
16pub(crate) struct SenderReportInternal {
17 pub(crate) interval: Duration,
18 pub(crate) now: Option<FnTimeGen>,
19 pub(crate) streams: Mutex<HashMap<u32, Arc<SenderStream>>>,
20 pub(crate) close_rx: Mutex<Option<mpsc::Receiver<()>>>,
21}
22
23pub struct SenderReport {
25 pub(crate) internal: Arc<SenderReportInternal>,
26
27 pub(crate) wg: Mutex<Option<WaitGroup>>,
28 pub(crate) close_tx: Mutex<Option<mpsc::Sender<()>>>,
29}
30
31impl SenderReport {
32 pub fn builder() -> ReportBuilder {
34 ReportBuilder {
35 is_rr: false,
36 ..Default::default()
37 }
38 }
39
40 async fn is_closed(&self) -> bool {
41 let close_tx = self.close_tx.lock().await;
42 close_tx.is_none()
43 }
44
45 async fn run(
46 rtcp_writer: Arc<dyn RTCPWriter + Send + Sync>,
47 internal: Arc<SenderReportInternal>,
48 ) -> Result<()> {
49 let mut ticker = tokio::time::interval(internal.interval);
50 let mut close_rx = {
51 let mut close_rx = internal.close_rx.lock().await;
52 if let Some(close) = close_rx.take() {
53 close
54 } else {
55 return Err(Error::ErrInvalidCloseRx);
56 }
57 };
58
59 loop {
60 tokio::select! {
61 _ = ticker.tick() =>{
62 let now = if let Some(f) = &internal.now {
64 f()
65 } else {
66 SystemTime::now()
67 };
68 let streams:Vec<Arc<SenderStream>> = {
69 let m = internal.streams.lock().await;
70 m.values().cloned().collect()
71 };
72 for stream in streams {
73 let pkt = stream.generate_report(now).await;
74
75 let a = Attributes::new();
76 if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
77 log::warn!("failed sending: {err}");
78 }
79 }
80 }
81 _ = close_rx.recv() =>{
82 return Ok(());
83 }
84 }
85 }
86 }
87}
88
89#[async_trait]
90impl Interceptor for SenderReport {
91 async fn bind_rtcp_reader(
94 &self,
95 reader: Arc<dyn RTCPReader + Send + Sync>,
96 ) -> Arc<dyn RTCPReader + Send + Sync> {
97 reader
98 }
99
100 async fn bind_rtcp_writer(
103 &self,
104 writer: Arc<dyn RTCPWriter + Send + Sync>,
105 ) -> Arc<dyn RTCPWriter + Send + Sync> {
106 if self.is_closed().await {
107 return writer;
108 }
109
110 let mut w = {
111 let wait_group = self.wg.lock().await;
112 wait_group.as_ref().map(|wg| wg.worker())
113 };
114 let writer2 = Arc::clone(&writer);
115 let internal = Arc::clone(&self.internal);
116 tokio::spawn(async move {
117 let _d = w.take();
118 if let Err(err) = SenderReport::run(writer2, internal).await {
119 log::warn!("bind_rtcp_writer Generator::run got error: {err}");
120 }
121 });
122
123 writer
124 }
125
126 async fn bind_local_stream(
129 &self,
130 info: &StreamInfo,
131 writer: Arc<dyn RTPWriter + Send + Sync>,
132 ) -> Arc<dyn RTPWriter + Send + Sync> {
133 let stream = Arc::new(SenderStream::new(
134 info.ssrc,
135 info.clock_rate,
136 writer,
137 self.internal.now.clone(),
138 ));
139 {
140 let mut streams = self.internal.streams.lock().await;
141 streams.insert(info.ssrc, Arc::clone(&stream));
142 }
143
144 stream
145 }
146
147 async fn unbind_local_stream(&self, info: &StreamInfo) {
149 let mut streams = self.internal.streams.lock().await;
150 streams.remove(&info.ssrc);
151 }
152
153 async fn bind_remote_stream(
156 &self,
157 _info: &StreamInfo,
158 reader: Arc<dyn RTPReader + Send + Sync>,
159 ) -> Arc<dyn RTPReader + Send + Sync> {
160 reader
161 }
162
163 async fn unbind_remote_stream(&self, _info: &StreamInfo) {}
165
166 async fn close(&self) -> Result<()> {
168 {
169 let mut close_tx = self.close_tx.lock().await;
170 close_tx.take();
171 }
172
173 {
174 let mut wait_group = self.wg.lock().await;
175 if let Some(wg) = wait_group.take() {
176 wg.wait().await;
177 }
178 }
179
180 Ok(())
181 }
182}