1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
//! Statistics events.
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::time::{Duration, Instant};
use crate::Bitrate;
use crate::rtp::SeqNo;
use crate::rtp_::{Mid, Rid};
use crate::{io::Protocol, rtp_::MidRid};
pub(crate) struct Stats {
last_now: Option<Instant>,
events: VecDeque<StatsEvent>,
interval: Duration,
}
pub(crate) struct StatsSnapshot {
pub peer_tx: u64,
pub peer_rx: u64,
pub tx: u64,
pub rx: u64,
pub egress_loss_fraction: Option<f32>,
pub ingress_loss_fraction: Option<f32>,
pub rtt: Option<Duration>,
pub ingress: HashMap<MidRid, MediaIngressStats>,
pub egress: HashMap<MidRid, MediaEgressStats>,
pub bwe_tx: Option<Bitrate>,
pub selected_candidate_pair: Option<CandidatePairStats>,
timestamp: Instant,
}
impl StatsSnapshot {
pub(crate) fn new(timestamp: Instant) -> StatsSnapshot {
StatsSnapshot {
peer_rx: 0,
peer_tx: 0,
tx: 0,
rx: 0,
egress_loss_fraction: None,
ingress_loss_fraction: None,
ingress: HashMap::new(),
egress: HashMap::new(),
rtt: None,
bwe_tx: None,
selected_candidate_pair: None,
timestamp,
}
}
}
// Output events
#[derive(Debug, Clone)]
pub(crate) enum StatsEvent {
Peer(PeerStats),
MediaEgress(MediaEgressStats),
MediaIngress(MediaIngressStats),
}
/// Peer statistics in [`Event::PeerStats`][crate::Event::PeerStats].
///
/// This event is generated roughly every second
#[derive(Debug, Clone)]
pub struct PeerStats {
/// Total bytes transmitted.
pub peer_bytes_rx: u64,
/// Total bytes received.
pub peer_bytes_tx: u64,
/// Total bytes transmitted, only counting media traffic (rtp payload).
pub bytes_rx: u64,
/// Total bytes received, only counting media traffic (rtp payload).
pub bytes_tx: u64,
/// Timestamp when this event was generated.
pub timestamp: Instant,
/// The last egress bandwidth estimate from the BWE subsystem, if enabled.
pub bwe_tx: Option<Bitrate>,
/// The egress loss over the last second.
pub egress_loss_fraction: Option<f32>,
/// The ingress loss since the last stats event.
pub ingress_loss_fraction: Option<f32>,
/// The most recent RTT since the last stats event.
pub rtt: Option<Duration>,
/// The selected ICE candidate pair, if any.
pub selected_candidate_pair: Option<CandidatePairStats>,
}
#[derive(Debug, Clone)]
/// ICE candidate pair statistics.
pub struct CandidatePairStats {
/// The selected protocol.
pub protocol: Protocol,
/// The local candidate.
pub local: CandidateStats,
/// The remote candidate.
pub remote: CandidateStats,
}
#[derive(Debug, Clone)]
/// ICE candidate statistics.
pub struct CandidateStats {
/// The address of the candidate.
pub addr: SocketAddr,
}
/// Outgoing media statistics in [`Event::MediaEgressStats`][crate::Event::MediaEgressStats].
///
/// note: when simulcast is disabled, `rid` is `None`
#[derive(Debug, Clone)]
pub struct MediaEgressStats {
/// The identifier of the media these stats are for.
pub mid: Mid,
/// The Rid identifier in case of simulcast.
pub rid: Option<Rid>,
/// Total bytes sent, including retransmissions.
///
/// Spec equivalent to [`RTCSentRtpStreamStats.bytesSent`][1].
///
/// [1]: https://www.w3.org/TR/webrtc-stats/#dom-rtcsentrtpstreamstats-bytessent
pub bytes: u64,
/// Total number of rtp packets sent, including retransmissions
///
/// Spec equivalent of [`RTCSentRtpStreamStats.packetsSent`][1].
///
/// [1]: https://www.w3.org/TR/webrtc-stats/#dom-rtcsentrtpstreamstats-packetssent
pub packets: u64,
/// Number of firs received.
pub firs: u64,
/// Number of plis received.
pub plis: u64,
/// Number of nacks received.
pub nacks: u64,
/// Round-trip-time extracted from the last RTCP receiver report.
pub rtt: Option<Duration>,
/// Fraction of packets lost averaged from the RTCP receiver reports received.
/// `None` if no reports have been received since the last event
pub loss: Option<f32>,
/// Timestamp when this event was generated
pub timestamp: Instant,
/// Stats provided by the remote peer via ReceiverReports
pub remote: Option<RemoteIngressStats>,
}
/// Stats as reported by the remote side (via RTCP ReceiverReports).
#[derive(Debug, Clone)]
pub struct RemoteIngressStats {
/// The remotely calculated jitter.
pub jitter: u32,
/// The maximum extended sequence number received.
pub maximum_sequence_number: SeqNo,
/// The cumulative number of packets lost.
pub packets_lost: u64,
}
/// Incoming media statistics in [`Event::MediaIngressStats`][crate::Event::MediaIngressStats].
///
/// note: when simulcast is disabled, `rid` is `None`
#[derive(Debug, Clone)]
pub struct MediaIngressStats {
/// The identifier of the media these stats are for.
pub mid: Mid,
/// The Rid identifier in case of simulcast.
pub rid: Option<Rid>,
/// Total bytes received, including retransmissions.
pub bytes: u64,
/// Total number of rtp packets received, including retransmissions.
pub packets: u64,
/// Number of firs sent.
pub firs: u64,
/// Number of plis sent.
pub plis: u64,
/// Number of nacks sent.
pub nacks: u64,
/// Round-trip-time extracted from the last RTCP XR DLRR report block.
pub rtt: Option<Duration>,
/// Fraction of packets lost extracted from the last RTCP receiver report.
pub loss: Option<f32>,
/// Timestamp when this event was generated.
pub timestamp: Instant,
/// Stats provided by the remote peer via SenderReports
pub remote: Option<RemoteEgressStats>,
}
impl MediaIngressStats {
/// Merge `other` into `self`, mutating `self`.
///
/// **Panics** if called with stats that don't have the same `(mid, rid)` pair.
pub(crate) fn merge_by_mid_rid(&mut self, other: &Self) {
assert!(
self.mid == other.mid,
"Cannot merge MediaIngressStats for different mids"
);
assert!(
self.rid == other.rid,
"Cannot merge MediaIngressStats for different rids"
);
let (rtt, loss) = if self.timestamp > other.timestamp {
(self.rtt, self.loss)
} else {
(other.rtt, other.loss)
};
*self = Self {
mid: self.mid,
rid: self.rid,
bytes: self.bytes + other.bytes,
packets: self.packets + other.packets,
firs: self.firs + other.firs,
plis: self.plis + other.plis,
nacks: self.nacks + other.nacks,
rtt,
loss,
timestamp: self.timestamp.max(other.timestamp),
remote: match (&self.remote, &other.remote) {
(None, None) => None,
(Some(remote), None) => Some(remote.clone()),
(None, Some(other_remote)) => Some(other_remote.clone()),
(Some(remote), Some(other_remote)) => Some(RemoteEgressStats {
bytes: remote.bytes + other_remote.bytes,
packets: remote.packets + other_remote.packets,
}),
},
};
}
}
/// Stats as reported by the remote side (via RTCP SenderReports).
#[derive(Debug, Clone)]
pub struct RemoteEgressStats {
/// Total bytes sent, including retransmissions.
pub bytes: u64,
/// Total number of rtp packets sent, including retransmissions.
pub packets: u64,
}
impl Stats {
/// Create a new stats instance
///
/// The internal state is market with the current `Instant::now()`.
/// This allows us to emit stats right away at the first upcoming timeout
pub fn new(interval: Duration) -> Stats {
Stats {
// by starting with the current time we can generate stats right on first timeout
last_now: None,
events: VecDeque::new(),
interval,
}
}
/// Returns true if we want to handle the timeout
///
/// The caller can use this to compute the snapshot only if needed, before calling \
/// [`Stats::do_handle_timeout`]
pub fn wants_timeout(&mut self, now: Instant) -> bool {
let Some(last_now) = self.last_now else {
// Learn our first ever `now`
self.last_now = Some(now);
return false;
};
let min_step = last_now + self.interval;
now >= min_step
}
/// Actually handles the timeout advancing the internal state and preparing the output
pub fn do_handle_timeout(&mut self, snapshot: &mut StatsSnapshot) {
// enqueue stats and timestamp them so they can be sent out
let event = PeerStats {
peer_bytes_rx: snapshot.peer_rx,
peer_bytes_tx: snapshot.peer_tx,
bytes_rx: snapshot.rx,
bytes_tx: snapshot.tx,
timestamp: snapshot.timestamp,
bwe_tx: snapshot.bwe_tx,
egress_loss_fraction: snapshot.egress_loss_fraction,
ingress_loss_fraction: snapshot.ingress_loss_fraction,
rtt: snapshot.rtt,
selected_candidate_pair: snapshot.selected_candidate_pair.clone(),
};
self.events.push_back(StatsEvent::Peer(event));
for (_, event) in snapshot.ingress.drain() {
self.events.push_back(StatsEvent::MediaIngress(event));
}
for (_, event) in snapshot.egress.drain() {
self.events.push_back(StatsEvent::MediaEgress(event));
}
self.last_now = Some(snapshot.timestamp);
}
/// Poll for the next time to call [`Stats::wants_timeout`] and [`Stats::do_handle_timeout`].
///
/// NOTE: we only need Option<_> to conform to .soonest() (see caller)
pub fn poll_timeout(&self) -> Option<Instant> {
let last_now = self.last_now?;
Some(last_now + self.interval)
}
/// Return any events ready for delivery
pub fn poll_output(&mut self) -> Option<StatsEvent> {
self.events.pop_front()
}
}