1use std::mem;
2use std::time::{Duration, Instant};
3
4use crate::rtp_::SeqNo;
5
6use super::time::{TimeDelta, Timestamp};
7use super::AckedPacket;
8
9const BURST_TIME_INTERVAL: Duration = Duration::from_millis(5);
10const SEND_TIME_GROUP_LENGTH: Duration = Duration::from_millis(5);
11const MAX_BURST_DURATION: Duration = Duration::from_millis(100);
12
13#[derive(Debug, Default)]
14pub struct ArrivalGroup {
15 first: Option<(SeqNo, Instant, Instant)>,
16 last_seq_no: Option<SeqNo>,
17 last_local_send_time: Option<Instant>,
18 last_remote_recv_time: Option<Instant>,
19 size: usize,
20}
21
22impl ArrivalGroup {
23 fn add_packet(&mut self, packet: &AckedPacket) -> bool {
27 match self.belongs_to_group(packet) {
28 Belongs::NewGroup => return true,
29 Belongs::Skipped => return false,
30 Belongs::Yes => {}
31 }
32
33 if self.first.is_none() {
34 self.first = Some((
35 packet.seq_no,
36 packet.local_send_time,
37 packet.remote_recv_time,
38 ));
39 }
40
41 self.last_remote_recv_time = self
42 .last_remote_recv_time
43 .max(Some(packet.remote_recv_time));
44 self.last_local_send_time = self.last_local_send_time.max(Some(packet.local_send_time));
45 self.size += 1;
46 self.last_seq_no = self.last_seq_no.max(Some(packet.seq_no));
47
48 false
49 }
50
51 fn belongs_to_group(&self, packet: &AckedPacket) -> Belongs {
52 let Some((_, first_local_send_time, first_remote_recv_time)) = self.first else {
53 return Belongs::Yes;
55 };
56
57 let Some(first_send_delta) = packet
58 .local_send_time
59 .checked_duration_since(first_local_send_time)
60 else {
61 return Belongs::Skipped;
63 };
64
65 let send_time_delta = Timestamp::from(packet.local_send_time) - self.local_send_time();
66 if send_time_delta == TimeDelta::ZERO {
67 return Belongs::Yes;
68 }
69 let arrival_time_delta = Timestamp::from(packet.remote_recv_time) - self.remote_recv_time();
70
71 let propagation_delta = arrival_time_delta - send_time_delta;
72 if propagation_delta < TimeDelta::ZERO
73 && arrival_time_delta <= BURST_TIME_INTERVAL
74 && packet.remote_recv_time - first_remote_recv_time < MAX_BURST_DURATION
75 {
76 Belongs::Yes
77 } else if first_send_delta > SEND_TIME_GROUP_LENGTH {
78 Belongs::NewGroup
79 } else {
80 Belongs::Yes
81 }
82 }
83
84 fn departure_delta(&self, other: &Self) -> TimeDelta {
86 Timestamp::from(other.local_send_time()) - self.local_send_time()
87 }
88
89 fn arrival_delta(&self, other: &Self) -> TimeDelta {
91 Timestamp::from(other.remote_recv_time()) - self.remote_recv_time()
92 }
93
94 fn local_send_time(&self) -> Instant {
98 self.last_local_send_time
99 .expect("local_send_time to only be called on non-empty groups")
100 }
101
102 fn remote_recv_time(&self) -> Instant {
106 self.last_remote_recv_time
107 .expect("remote_recv_time to only be called on non-empty groups")
108 }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113enum Belongs {
114 Yes,
116 NewGroup,
118 Skipped,
120}
121
122impl Belongs {
123 #[cfg(test)]
124 fn new_group(&self) -> bool {
125 matches!(self, Self::NewGroup)
126 }
127}
128
129#[derive(Debug, Default)]
130pub struct ArrivalGroupAccumulator {
131 previous_group: Option<ArrivalGroup>,
132 current_group: ArrivalGroup,
133}
134
135impl ArrivalGroupAccumulator {
136 pub(super) fn accumulate_packet(
141 &mut self,
142 packet: &AckedPacket,
143 ) -> Option<InterGroupDelayDelta> {
144 let need_new_group = self.current_group.add_packet(packet);
145
146 if !need_new_group {
147 return None;
148 }
149
150 let arrival_delta = self.arrival_delta();
152 let send_delta = self.send_delta();
153 let last_remote_recv_time = self.current_group.remote_recv_time();
154
155 let current_group = mem::take(&mut self.current_group);
156 self.previous_group = Some(current_group);
157
158 self.current_group.add_packet(packet);
159
160 Some(InterGroupDelayDelta {
161 send_delta: send_delta?,
162 arrival_delta: arrival_delta?,
163 last_remote_recv_time,
164 })
165 }
166
167 fn arrival_delta(&self) -> Option<TimeDelta> {
168 self.previous_group
169 .as_ref()
170 .map(|prev| prev.arrival_delta(&self.current_group))
171 }
172
173 fn send_delta(&self) -> Option<TimeDelta> {
174 self.previous_group
175 .as_ref()
176 .map(|prev| prev.departure_delta(&self.current_group))
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub(super) struct InterGroupDelayDelta {
183 pub(super) send_delta: TimeDelta,
186 pub(super) arrival_delta: TimeDelta,
188 pub(super) last_remote_recv_time: Instant,
190}
191
192#[cfg(test)]
193mod test {
194 use std::time::{Duration, Instant};
195
196 use crate::rtp_::DataSize;
197
198 use super::{AckedPacket, ArrivalGroup, ArrivalGroupAccumulator, Belongs, TimeDelta};
199
200 #[test]
201 fn test_arrival_group_all_packets_belong_to_empty_group() {
202 let now = Instant::now();
203 let group = ArrivalGroup::default();
204
205 assert_eq!(
206 group.belongs_to_group(&AckedPacket {
207 seq_no: 1.into(),
208 size: DataSize::ZERO,
209 local_send_time: now,
210 remote_recv_time: now + duration_us(10),
211 local_recv_time: now + duration_us(12),
212 }),
213 Belongs::Yes,
214 "Any packet should belong to an empty arrival group"
215 );
216 }
217
218 #[test]
219 fn test_arrival_group_all_packets_sent_within_burst_interval_belong() {
220 let now = Instant::now();
221 #[allow(clippy::vec_init_then_push)]
222 let packets = {
223 let mut packets = vec![];
224
225 packets.push(AckedPacket {
226 seq_no: 0.into(),
227 size: DataSize::ZERO,
228 local_send_time: now,
229 remote_recv_time: now + duration_us(150),
230 local_recv_time: now + duration_us(200),
231 });
232
233 packets.push(AckedPacket {
234 seq_no: 1.into(),
235 size: DataSize::ZERO,
236 local_send_time: now + duration_us(50),
237 remote_recv_time: now + duration_us(225),
238 local_recv_time: now + duration_us(275),
239 });
240
241 packets.push(AckedPacket {
242 seq_no: 2.into(),
243 size: DataSize::ZERO,
244 local_send_time: now + duration_us(1005),
245 remote_recv_time: now + duration_us(1140),
246 local_recv_time: now + duration_us(1190),
247 });
248
249 packets.push(AckedPacket {
250 seq_no: 3.into(),
251 size: DataSize::ZERO,
252 local_send_time: now + duration_us(4995),
253 remote_recv_time: now + duration_us(5001),
254 local_recv_time: now + duration_us(5051),
255 });
256
257 packets.push(AckedPacket {
259 seq_no: 4.into(),
260 size: DataSize::ZERO,
261 local_send_time: now + duration_us(5700),
262 remote_recv_time: now + duration_us(6000),
263 local_recv_time: now + duration_us(5750),
264 });
265
266 packets
267 };
268
269 let mut group = ArrivalGroup::default();
270
271 for p in packets {
272 let need_new_group = group.belongs_to_group(&p).new_group();
273 if !need_new_group {
274 group.add_packet(&p);
275 }
276 }
277
278 assert_eq!(group.size, 4, "Expected group to contain 4 packets");
279 }
280
281 #[test]
282 fn test_arrival_group_out_order_arrival_ignored() {
283 let now = Instant::now();
284 #[allow(clippy::vec_init_then_push)]
285 let packets = {
286 let mut packets = vec![];
287
288 packets.push(AckedPacket {
289 seq_no: 0.into(),
290 size: DataSize::ZERO,
291 local_send_time: now,
292 remote_recv_time: now + duration_us(150),
293 local_recv_time: now + duration_us(200),
294 });
295
296 packets.push(AckedPacket {
297 seq_no: 1.into(),
298 size: DataSize::ZERO,
299 local_send_time: now + duration_us(50),
300 remote_recv_time: now + duration_us(225),
301 local_recv_time: now + duration_us(275),
302 });
303
304 packets.push(AckedPacket {
305 seq_no: 2.into(),
306 size: DataSize::ZERO,
307 local_send_time: now + duration_us(1005),
308 remote_recv_time: now + duration_us(1140),
309 local_recv_time: now + duration_us(1190),
310 });
311
312 packets.push(AckedPacket {
313 seq_no: 3.into(),
314 size: DataSize::ZERO,
315 local_send_time: now + duration_us(4995),
316 remote_recv_time: now + duration_us(5001),
317 local_recv_time: now + duration_us(5051),
318 });
319
320 packets.push(AckedPacket {
322 seq_no: 4.into(),
323 size: DataSize::ZERO,
324 local_send_time: now - duration_us(100),
325 remote_recv_time: now + duration_us(5000),
326 local_recv_time: now + duration_us(5050),
327 });
328
329 packets.push(AckedPacket {
331 seq_no: 5.into(),
332 size: DataSize::ZERO,
333 local_send_time: now + duration_us(5700),
334 remote_recv_time: now + duration_us(6000),
335 local_recv_time: now + duration_us(6050),
336 });
337
338 packets
339 };
340
341 let mut group = ArrivalGroup::default();
342
343 for p in packets {
344 let need_new_group = group.belongs_to_group(&p).new_group();
345 if !need_new_group {
346 group.add_packet(&p);
347 }
348 }
349
350 assert_eq!(group.size, 4, "Expected group to contain 4 packets");
351 }
352
353 #[test]
354 fn test_arrival_group_arrival_membership() {
355 let now = Instant::now();
356 #[allow(clippy::vec_init_then_push)]
357 let packets = {
358 let mut packets = vec![];
359
360 packets.push(AckedPacket {
361 seq_no: 0.into(),
362 size: DataSize::ZERO,
363 local_send_time: now,
364 remote_recv_time: now + duration_us(150),
365 local_recv_time: now + duration_us(200),
366 });
367
368 packets.push(AckedPacket {
369 seq_no: 1.into(),
370 size: DataSize::ZERO,
371 local_send_time: now + duration_us(50),
372 remote_recv_time: now + duration_us(225),
373 local_recv_time: now + duration_us(275),
374 });
375
376 packets.push(AckedPacket {
377 seq_no: 2.into(),
378 size: DataSize::ZERO,
379 local_send_time: now + duration_us(5152),
380 remote_recv_time: now + duration_us(5224),
382 local_recv_time: now + duration_us(5274),
383 });
384
385 packets.push(AckedPacket {
387 seq_no: 3.into(),
388 size: DataSize::ZERO,
389 local_send_time: now + duration_us(5700),
390 remote_recv_time: now + duration_us(6000),
391 local_recv_time: now + duration_us(6050),
392 });
393
394 packets
395 };
396
397 let mut group = ArrivalGroup::default();
398
399 for p in packets {
400 let need_new_group = group.belongs_to_group(&p).new_group();
401 if !need_new_group {
402 group.add_packet(&p);
403 }
404 }
405
406 assert_eq!(group.size, 3, "Expected group to contain 4 packets");
407 }
408
409 #[test]
410 fn group_reorder() {
411 let data = vec![
412 ((Duration::from_millis(0), Duration::from_millis(0)), None),
413 ((Duration::from_millis(60), Duration::from_millis(5)), None),
414 ((Duration::from_millis(40), Duration::from_millis(10)), None),
415 (
416 (Duration::from_millis(70), Duration::from_millis(20)),
417 Some((TimeDelta::from_millis(-20), TimeDelta::from_millis(5))),
418 ),
419 ];
420
421 let now = Instant::now();
422 let mut aga = ArrivalGroupAccumulator::default();
423
424 for ((local_send_time, remote_recv_time), deltas) in data {
425 let group_delta = aga.accumulate_packet(&AckedPacket {
426 seq_no: Default::default(),
427 size: Default::default(),
428 local_send_time: now + local_send_time,
429 remote_recv_time: now + remote_recv_time,
430 local_recv_time: Instant::now(), });
432
433 assert_eq!(group_delta.map(|d| (d.send_delta, d.arrival_delta)), deltas);
434 }
435 }
436
437 fn duration_us(us: u64) -> Duration {
438 Duration::from_micros(us)
439 }
440}