1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::SystemTime;
4
5use tokio::time::Duration;
6
7mod interceptor;
8
9pub use self::interceptor::StatsInterceptor;
10
11pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> {
12 Arc::new(StatsInterceptor::new(id.to_owned()))
13}
14
15mod inbound {
17 use std::time::SystemTime;
18
19 use tokio::time::{Duration, Instant};
20
21 use super::{RTCPStats, RTPStats};
22
23 #[derive(Debug, Clone)]
24 pub(super) struct StreamStats {
28 pub(super) rtp_stats: RTPStats,
30 pub(super) rtcp_stats: RTCPStats,
32
33 last_update: Instant,
35
36 remote_packets_sent: u32,
38
39 remote_bytes_sent: u32,
41
42 remote_reports_sent: u64,
44
45 remote_round_trip_time: Option<f64>,
48
49 remote_total_round_trip_time: f64,
51
52 remote_round_trip_time_measurements: u64,
54 }
55
56 impl Default for StreamStats {
57 fn default() -> Self {
58 Self {
59 rtp_stats: RTPStats::default(),
60 rtcp_stats: RTCPStats::default(),
61 last_update: Instant::now(),
62 remote_packets_sent: 0,
63 remote_bytes_sent: 0,
64 remote_reports_sent: 0,
65 remote_round_trip_time: None,
66 remote_total_round_trip_time: 0.0,
67 remote_round_trip_time_measurements: 0,
68 }
69 }
70 }
71
72 impl StreamStats {
73 pub(super) fn snapshot(&self) -> StatsSnapshot {
74 self.into()
75 }
76
77 pub(super) fn mark_updated(&mut self) {
78 self.last_update = Instant::now();
79 }
80
81 pub(super) fn duration_since_last_update(&self) -> Duration {
82 self.last_update.elapsed()
83 }
84
85 pub(super) fn record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32) {
86 self.remote_reports_sent += 1;
87 self.remote_packets_sent = packets_sent;
88 self.remote_bytes_sent = bytes_sent;
89 }
90
91 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
92 self.remote_round_trip_time = round_trip_time;
94
95 if let Some(rtt) = round_trip_time {
96 self.remote_total_round_trip_time += rtt;
98 self.remote_round_trip_time_measurements += 1;
99 }
100 }
101 }
102
103 #[derive(Debug)]
107 pub struct StatsSnapshot {
108 rtp_stats: RTPStats,
110 rtcp_stats: RTCPStats,
112
113 remote_packets_sent: u32,
115
116 remote_bytes_sent: u32,
118
119 remote_reports_sent: u64,
121
122 remote_round_trip_time: Option<f64>,
125
126 remote_total_round_trip_time: f64,
128
129 remote_round_trip_time_measurements: u64,
131 }
132
133 impl StatsSnapshot {
134 pub fn packets_received(&self) -> u64 {
135 self.rtp_stats.packets
136 }
137
138 pub fn payload_bytes_received(&self) -> u64 {
139 self.rtp_stats.payload_bytes
140 }
141
142 pub fn header_bytes_received(&self) -> u64 {
143 self.rtp_stats.header_bytes
144 }
145
146 pub fn last_packet_received_timestamp(&self) -> Option<SystemTime> {
147 self.rtp_stats.last_packet_timestamp
148 }
149
150 pub fn nacks_sent(&self) -> u64 {
151 self.rtcp_stats.nack_count
152 }
153
154 pub fn firs_sent(&self) -> u64 {
155 self.rtcp_stats.fir_count
156 }
157
158 pub fn plis_sent(&self) -> u64 {
159 self.rtcp_stats.pli_count
160 }
161 pub fn remote_packets_sent(&self) -> u32 {
162 self.remote_packets_sent
163 }
164
165 pub fn remote_bytes_sent(&self) -> u32 {
166 self.remote_bytes_sent
167 }
168
169 pub fn remote_reports_sent(&self) -> u64 {
170 self.remote_reports_sent
171 }
172
173 pub fn remote_round_trip_time(&self) -> Option<f64> {
174 self.remote_round_trip_time
175 }
176
177 pub fn remote_total_round_trip_time(&self) -> f64 {
178 self.remote_total_round_trip_time
179 }
180
181 pub fn remote_round_trip_time_measurements(&self) -> u64 {
182 self.remote_round_trip_time_measurements
183 }
184 }
185
186 impl From<&StreamStats> for StatsSnapshot {
187 fn from(stream_stats: &StreamStats) -> Self {
188 Self {
189 rtp_stats: stream_stats.rtp_stats.clone(),
190 rtcp_stats: stream_stats.rtcp_stats.clone(),
191 remote_packets_sent: stream_stats.remote_packets_sent,
192 remote_bytes_sent: stream_stats.remote_bytes_sent,
193 remote_reports_sent: stream_stats.remote_reports_sent,
194 remote_round_trip_time: stream_stats.remote_round_trip_time,
195 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
196 remote_round_trip_time_measurements: stream_stats
197 .remote_round_trip_time_measurements,
198 }
199 }
200 }
201}
202
203mod outbound {
205 use std::time::SystemTime;
206
207 use tokio::time::{Duration, Instant};
208
209 use super::{RTCPStats, RTPStats};
210
211 #[derive(Debug, Clone)]
212 pub(super) struct StreamStats {
216 pub(super) rtp_stats: RTPStats,
218 pub(super) rtcp_stats: RTCPStats,
220
221 last_update: Instant,
223
224 initial_outbound_ext_seq_num: Option<u32>,
229
230 remote_packets_received: u64,
232
233 remote_total_lost: u32,
235
236 remote_jitter: u32,
238
239 remote_round_trip_time: Option<f64>,
242
243 remote_total_round_trip_time: f64,
245
246 remote_round_trip_time_measurements: u64,
248
249 remote_fraction_lost: Option<u8>,
251 }
252
253 impl Default for StreamStats {
254 fn default() -> Self {
255 Self {
256 rtp_stats: RTPStats::default(),
257 rtcp_stats: RTCPStats::default(),
258 last_update: Instant::now(),
259 initial_outbound_ext_seq_num: None,
260 remote_packets_received: 0,
261 remote_total_lost: 0,
262 remote_jitter: 0,
263 remote_round_trip_time: None,
264 remote_total_round_trip_time: 0.0,
265 remote_round_trip_time_measurements: 0,
266 remote_fraction_lost: None,
267 }
268 }
269 }
270
271 impl StreamStats {
272 pub(super) fn snapshot(&self) -> StatsSnapshot {
273 self.into()
274 }
275
276 pub(super) fn mark_updated(&mut self) {
277 self.last_update = Instant::now();
278 }
279
280 pub(super) fn duration_since_last_update(&self) -> Duration {
281 self.last_update.elapsed()
282 }
283
284 pub(super) fn update_remote_inbound_packets_received(
285 &mut self,
286 rr_ext_seq_num: u32,
287 rr_total_lost: u32,
288 ) {
289 if let Some(initial_ext_seq_num) = self.initial_outbound_ext_seq_num {
290 self.remote_packets_received =
299 (rr_ext_seq_num as u64) - (rr_total_lost as u64) - (initial_ext_seq_num as u64)
300 + 1;
301 }
302 }
303
304 #[inline(always)]
305 pub(super) fn record_sr_ext_seq_num(&mut self, seq_num: u32) {
306 if self.initial_outbound_ext_seq_num.is_none() {
308 self.initial_outbound_ext_seq_num = Some(seq_num);
309 }
310 }
311
312 pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
313 self.remote_round_trip_time = round_trip_time;
315
316 if let Some(rtt) = round_trip_time {
317 self.remote_total_round_trip_time += rtt;
319 self.remote_round_trip_time_measurements += 1;
320 }
321 }
322
323 pub(super) fn update_remote_fraction_lost(&mut self, fraction_lost: u8) {
324 self.remote_fraction_lost = Some(fraction_lost);
325 }
326
327 pub(super) fn update_remote_jitter(&mut self, jitter: u32) {
328 self.remote_jitter = jitter;
329 }
330
331 pub(super) fn update_remote_total_lost(&mut self, lost: u32) {
332 self.remote_total_lost = lost;
333 }
334 }
335
336 #[derive(Debug)]
340 pub struct StatsSnapshot {
341 rtp_stats: RTPStats,
343 rtcp_stats: RTCPStats,
345
346 remote_packets_received: u64,
348
349 remote_total_lost: u32,
351
352 remote_jitter: u32,
354
355 remote_round_trip_time: Option<f64>,
357
358 remote_total_round_trip_time: f64,
360
361 remote_round_trip_time_measurements: u64,
363
364 remote_fraction_lost: Option<f64>,
367 }
368
369 impl StatsSnapshot {
370 pub fn packets_sent(&self) -> u64 {
371 self.rtp_stats.packets
372 }
373
374 pub fn payload_bytes_sent(&self) -> u64 {
375 self.rtp_stats.payload_bytes
376 }
377
378 pub fn header_bytes_sent(&self) -> u64 {
379 self.rtp_stats.header_bytes
380 }
381
382 pub fn last_packet_sent_timestamp(&self) -> Option<SystemTime> {
383 self.rtp_stats.last_packet_timestamp
384 }
385
386 pub fn nacks_received(&self) -> u64 {
387 self.rtcp_stats.nack_count
388 }
389
390 pub fn firs_received(&self) -> u64 {
391 self.rtcp_stats.fir_count
392 }
393
394 pub fn plis_received(&self) -> u64 {
395 self.rtcp_stats.pli_count
396 }
397
398 pub fn remote_packets_received(&self) -> u64 {
400 self.remote_packets_received
401 }
402
403 pub fn remote_total_lost(&self) -> u32 {
405 self.remote_total_lost
406 }
407
408 pub fn remote_jitter(&self) -> u32 {
410 self.remote_jitter
411 }
412
413 pub fn remote_round_trip_time(&self) -> Option<f64> {
415 self.remote_round_trip_time
416 }
417
418 pub fn remote_total_round_trip_time(&self) -> f64 {
420 self.remote_total_round_trip_time
421 }
422
423 pub fn remote_round_trip_time_measurements(&self) -> u64 {
425 self.remote_round_trip_time_measurements
426 }
427
428 pub fn remote_fraction_lost(&self) -> Option<f64> {
430 self.remote_fraction_lost
431 }
432 }
433
434 impl From<&StreamStats> for StatsSnapshot {
435 fn from(stream_stats: &StreamStats) -> Self {
436 Self {
437 rtp_stats: stream_stats.rtp_stats.clone(),
438 rtcp_stats: stream_stats.rtcp_stats.clone(),
439 remote_packets_received: stream_stats.remote_packets_received,
440 remote_total_lost: stream_stats.remote_total_lost,
441 remote_jitter: stream_stats.remote_jitter,
442 remote_round_trip_time: stream_stats.remote_round_trip_time,
443 remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
444 remote_round_trip_time_measurements: stream_stats
445 .remote_round_trip_time_measurements,
446 remote_fraction_lost: stream_stats
447 .remote_fraction_lost
448 .map(|fraction| (fraction as f64) / (u8::MAX as f64)),
449 }
450 }
451 }
452}
453
454#[derive(Default, Debug)]
455struct StatsContainer {
456 inbound_stats: HashMap<u32, inbound::StreamStats>,
457 outbound_stats: HashMap<u32, outbound::StreamStats>,
458}
459
460impl StatsContainer {
461 fn get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats {
462 self.inbound_stats.entry(ssrc).or_default()
463 }
464
465 fn get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats {
466 self.outbound_stats.entry(ssrc).or_default()
467 }
468
469 fn get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats> {
470 self.inbound_stats.get(&ssrc)
471 }
472
473 fn get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats> {
474 self.outbound_stats.get(&ssrc)
475 }
476
477 fn remove_stale_entries(&mut self) {
478 const MAX_AGE: Duration = Duration::from_secs(60);
479
480 self.inbound_stats
481 .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
482 self.outbound_stats
483 .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
484 }
485}
486
487#[derive(Debug, Default, Clone, PartialEq, Eq)]
488pub struct RTPStats {
490 packets: u64,
492
493 payload_bytes: u64,
495
496 header_bytes: u64,
498
499 last_packet_timestamp: Option<SystemTime>,
502}
503
504impl RTPStats {
505 fn update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime) {
506 self.header_bytes += header_bytes;
507 self.payload_bytes += payload_bytes;
508 self.packets += packets;
509 self.last_packet_timestamp = Some(now);
510 }
511
512 pub fn header_bytes(&self) -> u64 {
513 self.header_bytes
514 }
515
516 pub fn payload_bytes(&self) -> u64 {
517 self.payload_bytes
518 }
519
520 pub fn packets(&self) -> u64 {
521 self.packets
522 }
523
524 pub fn last_packet_timestamp(&self) -> Option<SystemTime> {
525 self.last_packet_timestamp
526 }
527}
528
529#[derive(Debug, Default, Clone)]
530pub struct RTCPStats {
531 fir_count: u64,
533
534 pli_count: u64,
536
537 nack_count: u64,
539}
540
541impl RTCPStats {
542 #[allow(clippy::too_many_arguments)]
543 fn update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>) {
544 if let Some(fir_count) = fir_count {
545 self.fir_count += fir_count;
546 }
547
548 if let Some(pli_count) = pli_count {
549 self.pli_count += pli_count;
550 }
551
552 if let Some(nack_count) = nack_count {
553 self.nack_count += nack_count;
554 }
555 }
556
557 pub fn fir_count(&self) -> u64 {
558 self.fir_count
559 }
560
561 pub fn pli_count(&self) -> u64 {
562 self.pli_count
563 }
564
565 pub fn nack_count(&self) -> u64 {
566 self.nack_count
567 }
568}
569
570#[cfg(test)]
571mod test {
572 use super::*;
573
574 #[test]
575 fn test_rtp_stats() {
576 let mut stats: RTPStats = Default::default();
577 assert_eq!(
578 (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
579 (0, 0, 0),
580 );
581
582 stats.update(24, 960, 1, SystemTime::now());
583
584 assert_eq!(
585 (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
586 (24, 960, 1),
587 );
588 }
589
590 #[test]
591 fn test_rtcp_stats() {
592 let mut stats: RTCPStats = Default::default();
593 assert_eq!(
594 (stats.fir_count(), stats.pli_count(), stats.nack_count()),
595 (0, 0, 0),
596 );
597
598 stats.update(Some(1), Some(2), Some(3));
599
600 assert_eq!(
601 (stats.fir_count(), stats.pli_count(), stats.nack_count()),
602 (1, 2, 3),
603 );
604 }
605
606 #[test]
607 fn test_rtp_stats_send_sync() {
608 fn test_send_sync<T: Send + Sync>() {}
609 test_send_sync::<RTPStats>();
610 }
611
612 #[test]
613 fn test_rtcp_stats_send_sync() {
614 fn test_send_sync<T: Send + Sync>() {}
615 test_send_sync::<RTCPStats>();
616 }
617}