Skip to main content

interceptor/stats/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::SystemTime;
4
5use tokio::time::Duration;
6
7mod interceptor;
8
9pub use self::interceptor::StatsInterceptor;
10
11pub fn make_stats_interceptor(id: &str) -> Arc<StatsInterceptor> {
12    Arc::new(StatsInterceptor::new(id.to_owned()))
13}
14
15/// Types related to inbound RTP streams.
16mod inbound {
17    use std::time::SystemTime;
18
19    use tokio::time::{Duration, Instant};
20
21    use super::{RTCPStats, RTPStats};
22
23    #[derive(Debug, Clone)]
24    /// Stats collected for an inbound RTP stream.
25    /// Contains both stats relating to the inbound stream and remote stats for the corresponding
26    /// outbound stream at the remote end.
27    pub(super) struct StreamStats {
28        /// Received RTP stats.
29        pub(super) rtp_stats: RTPStats,
30        /// Common RTCP stats derived from inbound and outbound RTCP packets.
31        pub(super) rtcp_stats: RTCPStats,
32
33        /// The last time any stats where update, used for garbage collection to remove obsolete stats.
34        last_update: Instant,
35
36        /// The number of packets sent as reported in the latest SR from the remote.
37        remote_packets_sent: u32,
38
39        /// The number of bytes sent as reported in the latest SR from the remote.
40        remote_bytes_sent: u32,
41
42        /// The total number of sender reports sent by the remote and received.
43        remote_reports_sent: u64,
44
45        /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
46        /// been derived yet, or if it wasn't possible to derive it.
47        remote_round_trip_time: Option<f64>,
48
49        /// The cumulative total round trip times reported in ms.
50        remote_total_round_trip_time: f64,
51
52        /// The total number of measurements of the remote round trip time.
53        remote_round_trip_time_measurements: u64,
54    }
55
56    impl Default for StreamStats {
57        fn default() -> Self {
58            Self {
59                rtp_stats: RTPStats::default(),
60                rtcp_stats: RTCPStats::default(),
61                last_update: Instant::now(),
62                remote_packets_sent: 0,
63                remote_bytes_sent: 0,
64                remote_reports_sent: 0,
65                remote_round_trip_time: None,
66                remote_total_round_trip_time: 0.0,
67                remote_round_trip_time_measurements: 0,
68            }
69        }
70    }
71
72    impl StreamStats {
73        pub(super) fn snapshot(&self) -> StatsSnapshot {
74            self.into()
75        }
76
77        pub(super) fn mark_updated(&mut self) {
78            self.last_update = Instant::now();
79        }
80
81        pub(super) fn duration_since_last_update(&self) -> Duration {
82            self.last_update.elapsed()
83        }
84
85        pub(super) fn record_sender_report(&mut self, packets_sent: u32, bytes_sent: u32) {
86            self.remote_reports_sent += 1;
87            self.remote_packets_sent = packets_sent;
88            self.remote_bytes_sent = bytes_sent;
89        }
90
91        pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
92            // Store the latest measurement, even if it's None.
93            self.remote_round_trip_time = round_trip_time;
94
95            if let Some(rtt) = round_trip_time {
96                // Only if we have a valid measurement do we update the totals
97                self.remote_total_round_trip_time += rtt;
98                self.remote_round_trip_time_measurements += 1;
99            }
100        }
101    }
102
103    /// A point in time snapshot of the stream stats for an inbound RTP stream.
104    ///
105    /// Created by [`StreamStats::snapshot`].
106    #[derive(Debug)]
107    pub struct StatsSnapshot {
108        /// Received RTP stats.
109        rtp_stats: RTPStats,
110        /// Common RTCP stats derived from inbound and outbound RTCP packets.
111        rtcp_stats: RTCPStats,
112
113        /// The number of packets sent as reported in the latest SR from the remote.
114        remote_packets_sent: u32,
115
116        /// The number of bytes sent as reported in the latest SR from the remote.
117        remote_bytes_sent: u32,
118
119        /// The total number of sender reports sent by the remote and received.
120        remote_reports_sent: u64,
121
122        /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
123        /// been derived yet, or if it wasn't possible to derive it.
124        remote_round_trip_time: Option<f64>,
125
126        /// The cumulative total round trip times reported in ms.
127        remote_total_round_trip_time: f64,
128
129        /// The total number of measurements of the remote round trip time.
130        remote_round_trip_time_measurements: u64,
131    }
132
133    impl StatsSnapshot {
134        pub fn packets_received(&self) -> u64 {
135            self.rtp_stats.packets
136        }
137
138        pub fn payload_bytes_received(&self) -> u64 {
139            self.rtp_stats.payload_bytes
140        }
141
142        pub fn header_bytes_received(&self) -> u64 {
143            self.rtp_stats.header_bytes
144        }
145
146        pub fn last_packet_received_timestamp(&self) -> Option<SystemTime> {
147            self.rtp_stats.last_packet_timestamp
148        }
149
150        pub fn nacks_sent(&self) -> u64 {
151            self.rtcp_stats.nack_count
152        }
153
154        pub fn firs_sent(&self) -> u64 {
155            self.rtcp_stats.fir_count
156        }
157
158        pub fn plis_sent(&self) -> u64 {
159            self.rtcp_stats.pli_count
160        }
161        pub fn remote_packets_sent(&self) -> u32 {
162            self.remote_packets_sent
163        }
164
165        pub fn remote_bytes_sent(&self) -> u32 {
166            self.remote_bytes_sent
167        }
168
169        pub fn remote_reports_sent(&self) -> u64 {
170            self.remote_reports_sent
171        }
172
173        pub fn remote_round_trip_time(&self) -> Option<f64> {
174            self.remote_round_trip_time
175        }
176
177        pub fn remote_total_round_trip_time(&self) -> f64 {
178            self.remote_total_round_trip_time
179        }
180
181        pub fn remote_round_trip_time_measurements(&self) -> u64 {
182            self.remote_round_trip_time_measurements
183        }
184    }
185
186    impl From<&StreamStats> for StatsSnapshot {
187        fn from(stream_stats: &StreamStats) -> Self {
188            Self {
189                rtp_stats: stream_stats.rtp_stats.clone(),
190                rtcp_stats: stream_stats.rtcp_stats.clone(),
191                remote_packets_sent: stream_stats.remote_packets_sent,
192                remote_bytes_sent: stream_stats.remote_bytes_sent,
193                remote_reports_sent: stream_stats.remote_reports_sent,
194                remote_round_trip_time: stream_stats.remote_round_trip_time,
195                remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
196                remote_round_trip_time_measurements: stream_stats
197                    .remote_round_trip_time_measurements,
198            }
199        }
200    }
201}
202
203/// Types related to outbound RTP streams.
204mod outbound {
205    use std::time::SystemTime;
206
207    use tokio::time::{Duration, Instant};
208
209    use super::{RTCPStats, RTPStats};
210
211    #[derive(Debug, Clone)]
212    /// Stats collected for an outbound RTP stream.
213    /// Contains both stats relating to the outbound stream and remote stats for the corresponding
214    /// inbound stream.
215    pub(super) struct StreamStats {
216        /// Sent RTP stats.
217        pub(super) rtp_stats: RTPStats,
218        /// Common RTCP stats derived from inbound and outbound RTCP packets.
219        pub(super) rtcp_stats: RTCPStats,
220
221        /// The last time any stats where update, used for garbage collection to remove obsolete stats.
222        last_update: Instant,
223
224        /// The first value of extended seq num that was sent in an SR for this SSRC. [`None`] before
225        /// the first SR is sent.
226        ///
227        /// Used to calculate packet statistic for remote stats.
228        initial_outbound_ext_seq_num: Option<u32>,
229
230        /// The number of inbound packets received by the remote side for this stream.
231        remote_packets_received: u64,
232
233        /// The number of lost packets reported by the remote for this tream.
234        remote_total_lost: u32,
235
236        /// The estimated remote jitter for this stream in timestamp units.
237        remote_jitter: u32,
238
239        /// The last remote round trip time measurement in ms. [`None`] if no round trip time has
240        /// been derived yet, or if it wasn't possible to derive it.
241        remote_round_trip_time: Option<f64>,
242
243        /// The cumulative total round trip times reported in ms.
244        remote_total_round_trip_time: f64,
245
246        /// The total number of measurements of the remote round trip time.
247        remote_round_trip_time_measurements: u64,
248
249        /// The latest fraction lost value from RR.
250        remote_fraction_lost: Option<u8>,
251    }
252
253    impl Default for StreamStats {
254        fn default() -> Self {
255            Self {
256                rtp_stats: RTPStats::default(),
257                rtcp_stats: RTCPStats::default(),
258                last_update: Instant::now(),
259                initial_outbound_ext_seq_num: None,
260                remote_packets_received: 0,
261                remote_total_lost: 0,
262                remote_jitter: 0,
263                remote_round_trip_time: None,
264                remote_total_round_trip_time: 0.0,
265                remote_round_trip_time_measurements: 0,
266                remote_fraction_lost: None,
267            }
268        }
269    }
270
271    impl StreamStats {
272        pub(super) fn snapshot(&self) -> StatsSnapshot {
273            self.into()
274        }
275
276        pub(super) fn mark_updated(&mut self) {
277            self.last_update = Instant::now();
278        }
279
280        pub(super) fn duration_since_last_update(&self) -> Duration {
281            self.last_update.elapsed()
282        }
283
284        pub(super) fn update_remote_inbound_packets_received(
285            &mut self,
286            rr_ext_seq_num: u32,
287            rr_total_lost: u32,
288        ) {
289            if let Some(initial_ext_seq_num) = self.initial_outbound_ext_seq_num {
290                // Total number of RTP packets received for this SSRC.
291                // At the receiving endpoint, this is calculated as defined in [RFC3550] section 6.4.1.
292                // At the sending endpoint the packetsReceived is estimated by subtracting the
293                // Cumulative Number of Packets Lost from the Extended Highest Sequence Number Received,
294                // both reported in the RTCP Receiver Report, and then subtracting the
295                // initial Extended Sequence Number that was sent to this SSRC in a RTCP Sender Report and then adding one,
296                // to mirror what is discussed in Appendix A.3 in [RFC3550], but for the sender side.
297                // If no RTCP Receiver Report has been received yet, then return 0.
298                self.remote_packets_received =
299                    (rr_ext_seq_num as u64) - (rr_total_lost as u64) - (initial_ext_seq_num as u64)
300                        + 1;
301            }
302        }
303
304        #[inline(always)]
305        pub(super) fn record_sr_ext_seq_num(&mut self, seq_num: u32) {
306            // Only record the initial value
307            if self.initial_outbound_ext_seq_num.is_none() {
308                self.initial_outbound_ext_seq_num = Some(seq_num);
309            }
310        }
311
312        pub(super) fn record_remote_round_trip_time(&mut self, round_trip_time: Option<f64>) {
313            // Store the latest measurement, even if it's None.
314            self.remote_round_trip_time = round_trip_time;
315
316            if let Some(rtt) = round_trip_time {
317                // Only if we have a valid measurement do we update the totals
318                self.remote_total_round_trip_time += rtt;
319                self.remote_round_trip_time_measurements += 1;
320            }
321        }
322
323        pub(super) fn update_remote_fraction_lost(&mut self, fraction_lost: u8) {
324            self.remote_fraction_lost = Some(fraction_lost);
325        }
326
327        pub(super) fn update_remote_jitter(&mut self, jitter: u32) {
328            self.remote_jitter = jitter;
329        }
330
331        pub(super) fn update_remote_total_lost(&mut self, lost: u32) {
332            self.remote_total_lost = lost;
333        }
334    }
335
336    /// A point in time snapshot of the stream stats for an outbound RTP stream.
337    ///
338    /// Created by [`StreamStats::snapshot`].
339    #[derive(Debug)]
340    pub struct StatsSnapshot {
341        /// Sent RTP stats.
342        rtp_stats: RTPStats,
343        /// Common RTCP stats derived from inbound and outbound RTCP packets.
344        rtcp_stats: RTCPStats,
345
346        /// The number of inbound packets received by the remote side for this stream.
347        remote_packets_received: u64,
348
349        /// The number of lost packets reported by the remote for this tream.
350        remote_total_lost: u32,
351
352        /// The estimated remote jitter for this stream in timestamp units.
353        remote_jitter: u32,
354
355        /// The most recent remote round trip time in milliseconds.
356        remote_round_trip_time: Option<f64>,
357
358        /// The cumulative total round trip times reported in ms.
359        remote_total_round_trip_time: f64,
360
361        /// The total number of measurements of the remote round trip time.
362        remote_round_trip_time_measurements: u64,
363
364        /// The fraction of packets lost reported for this stream.
365        /// Calculated as defined in [RFC3550](https://www.rfc-editor.org/rfc/rfc3550) section 6.4.1 and Appendix A.3.
366        remote_fraction_lost: Option<f64>,
367    }
368
369    impl StatsSnapshot {
370        pub fn packets_sent(&self) -> u64 {
371            self.rtp_stats.packets
372        }
373
374        pub fn payload_bytes_sent(&self) -> u64 {
375            self.rtp_stats.payload_bytes
376        }
377
378        pub fn header_bytes_sent(&self) -> u64 {
379            self.rtp_stats.header_bytes
380        }
381
382        pub fn last_packet_sent_timestamp(&self) -> Option<SystemTime> {
383            self.rtp_stats.last_packet_timestamp
384        }
385
386        pub fn nacks_received(&self) -> u64 {
387            self.rtcp_stats.nack_count
388        }
389
390        pub fn firs_received(&self) -> u64 {
391            self.rtcp_stats.fir_count
392        }
393
394        pub fn plis_received(&self) -> u64 {
395            self.rtcp_stats.pli_count
396        }
397
398        /// Packets received on the remote side.
399        pub fn remote_packets_received(&self) -> u64 {
400            self.remote_packets_received
401        }
402
403        /// The number of lost packets reported by the remote for this tream.
404        pub fn remote_total_lost(&self) -> u32 {
405            self.remote_total_lost
406        }
407
408        /// The estimated remote jitter for this stream in timestamp units.
409        pub fn remote_jitter(&self) -> u32 {
410            self.remote_jitter
411        }
412
413        /// The latest RTT in ms if enough data is available to measure it.
414        pub fn remote_round_trip_time(&self) -> Option<f64> {
415            self.remote_round_trip_time
416        }
417
418        /// Total RTT in ms.
419        pub fn remote_total_round_trip_time(&self) -> f64 {
420            self.remote_total_round_trip_time
421        }
422
423        /// The number of RTT measurements so far.
424        pub fn remote_round_trip_time_measurements(&self) -> u64 {
425            self.remote_round_trip_time_measurements
426        }
427
428        /// The latest fraction lost value from the remote or None if it hasn't been reported yet.
429        pub fn remote_fraction_lost(&self) -> Option<f64> {
430            self.remote_fraction_lost
431        }
432    }
433
434    impl From<&StreamStats> for StatsSnapshot {
435        fn from(stream_stats: &StreamStats) -> Self {
436            Self {
437                rtp_stats: stream_stats.rtp_stats.clone(),
438                rtcp_stats: stream_stats.rtcp_stats.clone(),
439                remote_packets_received: stream_stats.remote_packets_received,
440                remote_total_lost: stream_stats.remote_total_lost,
441                remote_jitter: stream_stats.remote_jitter,
442                remote_round_trip_time: stream_stats.remote_round_trip_time,
443                remote_total_round_trip_time: stream_stats.remote_total_round_trip_time,
444                remote_round_trip_time_measurements: stream_stats
445                    .remote_round_trip_time_measurements,
446                remote_fraction_lost: stream_stats
447                    .remote_fraction_lost
448                    .map(|fraction| (fraction as f64) / (u8::MAX as f64)),
449            }
450        }
451    }
452}
453
454#[derive(Default, Debug)]
455struct StatsContainer {
456    inbound_stats: HashMap<u32, inbound::StreamStats>,
457    outbound_stats: HashMap<u32, outbound::StreamStats>,
458}
459
460impl StatsContainer {
461    fn get_or_create_inbound_stream_stats(&mut self, ssrc: u32) -> &mut inbound::StreamStats {
462        self.inbound_stats.entry(ssrc).or_default()
463    }
464
465    fn get_or_create_outbound_stream_stats(&mut self, ssrc: u32) -> &mut outbound::StreamStats {
466        self.outbound_stats.entry(ssrc).or_default()
467    }
468
469    fn get_inbound_stats(&self, ssrc: u32) -> Option<&inbound::StreamStats> {
470        self.inbound_stats.get(&ssrc)
471    }
472
473    fn get_outbound_stats(&self, ssrc: u32) -> Option<&outbound::StreamStats> {
474        self.outbound_stats.get(&ssrc)
475    }
476
477    fn remove_stale_entries(&mut self) {
478        const MAX_AGE: Duration = Duration::from_secs(60);
479
480        self.inbound_stats
481            .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
482        self.outbound_stats
483            .retain(|_, s| s.duration_since_last_update() < MAX_AGE);
484    }
485}
486
487#[derive(Debug, Default, Clone, PartialEq, Eq)]
488/// Records stats about a given RTP stream.
489pub struct RTPStats {
490    /// Packets sent or received
491    packets: u64,
492
493    /// Payload bytes sent or received
494    payload_bytes: u64,
495
496    /// Header bytes sent or received
497    header_bytes: u64,
498
499    /// A wall clock timestamp for when the last packet was sent or received encoded as milliseconds since
500    /// [`SystemTime::UNIX_EPOCH`].
501    last_packet_timestamp: Option<SystemTime>,
502}
503
504impl RTPStats {
505    fn update(&mut self, header_bytes: u64, payload_bytes: u64, packets: u64, now: SystemTime) {
506        self.header_bytes += header_bytes;
507        self.payload_bytes += payload_bytes;
508        self.packets += packets;
509        self.last_packet_timestamp = Some(now);
510    }
511
512    pub fn header_bytes(&self) -> u64 {
513        self.header_bytes
514    }
515
516    pub fn payload_bytes(&self) -> u64 {
517        self.payload_bytes
518    }
519
520    pub fn packets(&self) -> u64 {
521        self.packets
522    }
523
524    pub fn last_packet_timestamp(&self) -> Option<SystemTime> {
525        self.last_packet_timestamp
526    }
527}
528
529#[derive(Debug, Default, Clone)]
530pub struct RTCPStats {
531    /// The number of FIRs sent or received
532    fir_count: u64,
533
534    /// The number of PLIs sent or received
535    pli_count: u64,
536
537    /// The number of NACKs sent or received
538    nack_count: u64,
539}
540
541impl RTCPStats {
542    #[allow(clippy::too_many_arguments)]
543    fn update(&mut self, fir_count: Option<u64>, pli_count: Option<u64>, nack_count: Option<u64>) {
544        if let Some(fir_count) = fir_count {
545            self.fir_count += fir_count;
546        }
547
548        if let Some(pli_count) = pli_count {
549            self.pli_count += pli_count;
550        }
551
552        if let Some(nack_count) = nack_count {
553            self.nack_count += nack_count;
554        }
555    }
556
557    pub fn fir_count(&self) -> u64 {
558        self.fir_count
559    }
560
561    pub fn pli_count(&self) -> u64 {
562        self.pli_count
563    }
564
565    pub fn nack_count(&self) -> u64 {
566        self.nack_count
567    }
568}
569
570#[cfg(test)]
571mod test {
572    use super::*;
573
574    #[test]
575    fn test_rtp_stats() {
576        let mut stats: RTPStats = Default::default();
577        assert_eq!(
578            (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
579            (0, 0, 0),
580        );
581
582        stats.update(24, 960, 1, SystemTime::now());
583
584        assert_eq!(
585            (stats.header_bytes(), stats.payload_bytes(), stats.packets()),
586            (24, 960, 1),
587        );
588    }
589
590    #[test]
591    fn test_rtcp_stats() {
592        let mut stats: RTCPStats = Default::default();
593        assert_eq!(
594            (stats.fir_count(), stats.pli_count(), stats.nack_count()),
595            (0, 0, 0),
596        );
597
598        stats.update(Some(1), Some(2), Some(3));
599
600        assert_eq!(
601            (stats.fir_count(), stats.pli_count(), stats.nack_count()),
602            (1, 2, 3),
603        );
604    }
605
606    #[test]
607    fn test_rtp_stats_send_sync() {
608        fn test_send_sync<T: Send + Sync>() {}
609        test_send_sync::<RTPStats>();
610    }
611
612    #[test]
613    fn test_rtcp_stats_send_sync() {
614        fn test_send_sync<T: Send + Sync>() {}
615        test_send_sync::<RTCPStats>();
616    }
617}