str0m/packet/bwe/
arrival_group.rs

1use std::mem;
2use std::time::{Duration, Instant};
3
4use crate::rtp_::SeqNo;
5
6use super::time::{TimeDelta, Timestamp};
7use super::AckedPacket;
8
9const BURST_TIME_INTERVAL: Duration = Duration::from_millis(5);
10const SEND_TIME_GROUP_LENGTH: Duration = Duration::from_millis(5);
11const MAX_BURST_DURATION: Duration = Duration::from_millis(100);
12
13#[derive(Debug, Default)]
14pub struct ArrivalGroup {
15    first: Option<(SeqNo, Instant, Instant)>,
16    last_seq_no: Option<SeqNo>,
17    last_local_send_time: Option<Instant>,
18    last_remote_recv_time: Option<Instant>,
19    size: usize,
20}
21
22impl ArrivalGroup {
23    /// Maybe add a packet to the group.
24    ///
25    /// Returns [`true`] if a new group needs to be created and [`false`] otherwise.
26    fn add_packet(&mut self, packet: &AckedPacket) -> bool {
27        match self.belongs_to_group(packet) {
28            Belongs::NewGroup => return true,
29            Belongs::Skipped => return false,
30            Belongs::Yes => {}
31        }
32
33        if self.first.is_none() {
34            self.first = Some((
35                packet.seq_no,
36                packet.local_send_time,
37                packet.remote_recv_time,
38            ));
39        }
40
41        self.last_remote_recv_time = self
42            .last_remote_recv_time
43            .max(Some(packet.remote_recv_time));
44        self.last_local_send_time = self.last_local_send_time.max(Some(packet.local_send_time));
45        self.size += 1;
46        self.last_seq_no = self.last_seq_no.max(Some(packet.seq_no));
47
48        false
49    }
50
51    fn belongs_to_group(&self, packet: &AckedPacket) -> Belongs {
52        let Some((_, first_local_send_time, first_remote_recv_time)) = self.first else {
53            // Start of the group
54            return Belongs::Yes;
55        };
56
57        let Some(first_send_delta) = packet
58            .local_send_time
59            .checked_duration_since(first_local_send_time)
60        else {
61            // Out of order
62            return Belongs::Skipped;
63        };
64
65        let send_time_delta = Timestamp::from(packet.local_send_time) - self.local_send_time();
66        if send_time_delta == TimeDelta::ZERO {
67            return Belongs::Yes;
68        }
69        let arrival_time_delta = Timestamp::from(packet.remote_recv_time) - self.remote_recv_time();
70
71        let propagation_delta = arrival_time_delta - send_time_delta;
72        if propagation_delta < TimeDelta::ZERO
73            && arrival_time_delta <= BURST_TIME_INTERVAL
74            && packet.remote_recv_time - first_remote_recv_time < MAX_BURST_DURATION
75        {
76            Belongs::Yes
77        } else if first_send_delta > SEND_TIME_GROUP_LENGTH {
78            Belongs::NewGroup
79        } else {
80            Belongs::Yes
81        }
82    }
83
84    /// Calculate the send time delta between self and a subsequent group.
85    fn departure_delta(&self, other: &Self) -> TimeDelta {
86        Timestamp::from(other.local_send_time()) - self.local_send_time()
87    }
88
89    /// Calculate the remote receive time delta between self and a subsequent group.
90    fn arrival_delta(&self, other: &Self) -> TimeDelta {
91        Timestamp::from(other.remote_recv_time()) - self.remote_recv_time()
92    }
93
94    /// The local send time i.e. departure time, for the group.
95    ///
96    /// Panics if the group doesn't have at least one packet.
97    fn local_send_time(&self) -> Instant {
98        self.last_local_send_time
99            .expect("local_send_time to only be called on non-empty groups")
100    }
101
102    /// The remote receive time i.e. arrival time, for the group.
103    ///
104    /// Panics if the group doesn't have at least one packet.
105    fn remote_recv_time(&self) -> Instant {
106        self.last_remote_recv_time
107            .expect("remote_recv_time to only be called on non-empty groups")
108    }
109}
110
111/// Whether a given packet is belongs to a group or not.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113enum Belongs {
114    /// The packet is belongs to the group.
115    Yes,
116    /// The packet is does not belong to the group, a new group should be created.
117    NewGroup,
118    /// The packet was skipped and a decision wasn't made.
119    Skipped,
120}
121
122impl Belongs {
123    #[cfg(test)]
124    fn new_group(&self) -> bool {
125        matches!(self, Self::NewGroup)
126    }
127}
128
129#[derive(Debug, Default)]
130pub struct ArrivalGroupAccumulator {
131    previous_group: Option<ArrivalGroup>,
132    current_group: ArrivalGroup,
133}
134
135impl ArrivalGroupAccumulator {
136    ///
137    /// Accumulate a packet.
138    ///
139    /// If adding this packet produced a new delay delta it is returned.
140    pub(super) fn accumulate_packet(
141        &mut self,
142        packet: &AckedPacket,
143    ) -> Option<InterGroupDelayDelta> {
144        let need_new_group = self.current_group.add_packet(packet);
145
146        if !need_new_group {
147            return None;
148        }
149
150        // Variation between previous group and current.
151        let arrival_delta = self.arrival_delta();
152        let send_delta = self.send_delta();
153        let last_remote_recv_time = self.current_group.remote_recv_time();
154
155        let current_group = mem::take(&mut self.current_group);
156        self.previous_group = Some(current_group);
157
158        self.current_group.add_packet(packet);
159
160        Some(InterGroupDelayDelta {
161            send_delta: send_delta?,
162            arrival_delta: arrival_delta?,
163            last_remote_recv_time,
164        })
165    }
166
167    fn arrival_delta(&self) -> Option<TimeDelta> {
168        self.previous_group
169            .as_ref()
170            .map(|prev| prev.arrival_delta(&self.current_group))
171    }
172
173    fn send_delta(&self) -> Option<TimeDelta> {
174        self.previous_group
175            .as_ref()
176            .map(|prev| prev.departure_delta(&self.current_group))
177    }
178}
179
180/// The calculate delay delta between two groups of packets.
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub(super) struct InterGroupDelayDelta {
183    /// The delta between the send times of the two groups i.e. delta between the last packet sent
184    /// in each group.
185    pub(super) send_delta: TimeDelta,
186    /// The delta between the remote arrival times of the two groups.
187    pub(super) arrival_delta: TimeDelta,
188    /// The reported receive time for the last packet in the first arrival group.
189    pub(super) last_remote_recv_time: Instant,
190}
191
192#[cfg(test)]
193mod test {
194    use std::time::{Duration, Instant};
195
196    use crate::rtp_::DataSize;
197
198    use super::{AckedPacket, ArrivalGroup, ArrivalGroupAccumulator, Belongs, TimeDelta};
199
200    #[test]
201    fn test_arrival_group_all_packets_belong_to_empty_group() {
202        let now = Instant::now();
203        let group = ArrivalGroup::default();
204
205        assert_eq!(
206            group.belongs_to_group(&AckedPacket {
207                seq_no: 1.into(),
208                size: DataSize::ZERO,
209                local_send_time: now,
210                remote_recv_time: now + duration_us(10),
211                local_recv_time: now + duration_us(12),
212            }),
213            Belongs::Yes,
214            "Any packet should belong to an empty arrival group"
215        );
216    }
217
218    #[test]
219    fn test_arrival_group_all_packets_sent_within_burst_interval_belong() {
220        let now = Instant::now();
221        #[allow(clippy::vec_init_then_push)]
222        let packets = {
223            let mut packets = vec![];
224
225            packets.push(AckedPacket {
226                seq_no: 0.into(),
227                size: DataSize::ZERO,
228                local_send_time: now,
229                remote_recv_time: now + duration_us(150),
230                local_recv_time: now + duration_us(200),
231            });
232
233            packets.push(AckedPacket {
234                seq_no: 1.into(),
235                size: DataSize::ZERO,
236                local_send_time: now + duration_us(50),
237                remote_recv_time: now + duration_us(225),
238                local_recv_time: now + duration_us(275),
239            });
240
241            packets.push(AckedPacket {
242                seq_no: 2.into(),
243                size: DataSize::ZERO,
244                local_send_time: now + duration_us(1005),
245                remote_recv_time: now + duration_us(1140),
246                local_recv_time: now + duration_us(1190),
247            });
248
249            packets.push(AckedPacket {
250                seq_no: 3.into(),
251                size: DataSize::ZERO,
252                local_send_time: now + duration_us(4995),
253                remote_recv_time: now + duration_us(5001),
254                local_recv_time: now + duration_us(5051),
255            });
256
257            // Should not belong
258            packets.push(AckedPacket {
259                seq_no: 4.into(),
260                size: DataSize::ZERO,
261                local_send_time: now + duration_us(5700),
262                remote_recv_time: now + duration_us(6000),
263                local_recv_time: now + duration_us(5750),
264            });
265
266            packets
267        };
268
269        let mut group = ArrivalGroup::default();
270
271        for p in packets {
272            let need_new_group = group.belongs_to_group(&p).new_group();
273            if !need_new_group {
274                group.add_packet(&p);
275            }
276        }
277
278        assert_eq!(group.size, 4, "Expected group to contain 4 packets");
279    }
280
281    #[test]
282    fn test_arrival_group_out_order_arrival_ignored() {
283        let now = Instant::now();
284        #[allow(clippy::vec_init_then_push)]
285        let packets = {
286            let mut packets = vec![];
287
288            packets.push(AckedPacket {
289                seq_no: 0.into(),
290                size: DataSize::ZERO,
291                local_send_time: now,
292                remote_recv_time: now + duration_us(150),
293                local_recv_time: now + duration_us(200),
294            });
295
296            packets.push(AckedPacket {
297                seq_no: 1.into(),
298                size: DataSize::ZERO,
299                local_send_time: now + duration_us(50),
300                remote_recv_time: now + duration_us(225),
301                local_recv_time: now + duration_us(275),
302            });
303
304            packets.push(AckedPacket {
305                seq_no: 2.into(),
306                size: DataSize::ZERO,
307                local_send_time: now + duration_us(1005),
308                remote_recv_time: now + duration_us(1140),
309                local_recv_time: now + duration_us(1190),
310            });
311
312            packets.push(AckedPacket {
313                seq_no: 3.into(),
314                size: DataSize::ZERO,
315                local_send_time: now + duration_us(4995),
316                remote_recv_time: now + duration_us(5001),
317                local_recv_time: now + duration_us(5051),
318            });
319
320            // Should be skipped
321            packets.push(AckedPacket {
322                seq_no: 4.into(),
323                size: DataSize::ZERO,
324                local_send_time: now - duration_us(100),
325                remote_recv_time: now + duration_us(5000),
326                local_recv_time: now + duration_us(5050),
327            });
328
329            // Should not belong
330            packets.push(AckedPacket {
331                seq_no: 5.into(),
332                size: DataSize::ZERO,
333                local_send_time: now + duration_us(5700),
334                remote_recv_time: now + duration_us(6000),
335                local_recv_time: now + duration_us(6050),
336            });
337
338            packets
339        };
340
341        let mut group = ArrivalGroup::default();
342
343        for p in packets {
344            let need_new_group = group.belongs_to_group(&p).new_group();
345            if !need_new_group {
346                group.add_packet(&p);
347            }
348        }
349
350        assert_eq!(group.size, 4, "Expected group to contain 4 packets");
351    }
352
353    #[test]
354    fn test_arrival_group_arrival_membership() {
355        let now = Instant::now();
356        #[allow(clippy::vec_init_then_push)]
357        let packets = {
358            let mut packets = vec![];
359
360            packets.push(AckedPacket {
361                seq_no: 0.into(),
362                size: DataSize::ZERO,
363                local_send_time: now,
364                remote_recv_time: now + duration_us(150),
365                local_recv_time: now + duration_us(200),
366            });
367
368            packets.push(AckedPacket {
369                seq_no: 1.into(),
370                size: DataSize::ZERO,
371                local_send_time: now + duration_us(50),
372                remote_recv_time: now + duration_us(225),
373                local_recv_time: now + duration_us(275),
374            });
375
376            packets.push(AckedPacket {
377                seq_no: 2.into(),
378                size: DataSize::ZERO,
379                local_send_time: now + duration_us(5152),
380                // Just less than 5ms inter arrival delta
381                remote_recv_time: now + duration_us(5224),
382                local_recv_time: now + duration_us(5274),
383            });
384
385            // Should not belong
386            packets.push(AckedPacket {
387                seq_no: 3.into(),
388                size: DataSize::ZERO,
389                local_send_time: now + duration_us(5700),
390                remote_recv_time: now + duration_us(6000),
391                local_recv_time: now + duration_us(6050),
392            });
393
394            packets
395        };
396
397        let mut group = ArrivalGroup::default();
398
399        for p in packets {
400            let need_new_group = group.belongs_to_group(&p).new_group();
401            if !need_new_group {
402                group.add_packet(&p);
403            }
404        }
405
406        assert_eq!(group.size, 3, "Expected group to contain 4 packets");
407    }
408
409    #[test]
410    fn group_reorder() {
411        let data = vec![
412            ((Duration::from_millis(0), Duration::from_millis(0)), None),
413            ((Duration::from_millis(60), Duration::from_millis(5)), None),
414            ((Duration::from_millis(40), Duration::from_millis(10)), None),
415            (
416                (Duration::from_millis(70), Duration::from_millis(20)),
417                Some((TimeDelta::from_millis(-20), TimeDelta::from_millis(5))),
418            ),
419        ];
420
421        let now = Instant::now();
422        let mut aga = ArrivalGroupAccumulator::default();
423
424        for ((local_send_time, remote_recv_time), deltas) in data {
425            let group_delta = aga.accumulate_packet(&AckedPacket {
426                seq_no: Default::default(),
427                size: Default::default(),
428                local_send_time: now + local_send_time,
429                remote_recv_time: now + remote_recv_time,
430                local_recv_time: Instant::now(), // does not matter
431            });
432
433            assert_eq!(group_delta.map(|d| (d.send_delta, d.arrival_delta)), deltas);
434        }
435    }
436
437    fn duration_us(us: u64) -> Duration {
438        Duration::from_micros(us)
439    }
440}