1use std::{collections::VecDeque, ops::Deref};
4
5use lru::LruCache;
6
7use crate::rtp::{IncomingRtpPacket, OrderedRtpPacket};
8
9pub enum ReorderingError {
11 BufferFull(IncomingRtpPacket),
12 DuplicatePacket(IncomingRtpPacket),
13}
14
15impl ReorderingError {
16 #[inline]
19 pub fn is_full(&self) -> bool {
20 matches!(self, Self::BufferFull(_))
21 }
22
23 #[inline]
25 pub fn is_duplicate(&self) -> bool {
26 matches!(self, Self::DuplicatePacket(_))
27 }
28}
29
30pub struct ReorderingBuffer {
44 inner: InternalBuffer,
45}
46
47impl ReorderingBuffer {
48 #[inline]
50 pub fn new(depth: usize) -> Self {
51 Self {
52 inner: InternalBuffer::new(depth),
53 }
54 }
55
56 pub fn estimate_index(&self, sequence_nr: u16) -> u64 {
58 self.inner.estimate_index(sequence_nr)
59 }
60
61 pub fn is_duplicate(&self, index: u64) -> bool {
63 self.inner.is_duplicate(index)
64 }
65
66 #[allow(clippy::result_large_err)]
73 pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
74 self.inner
75 .push(InputPacket::new(packet, 0))
76 .map_err(ReorderingError::from)
77 }
78
79 #[allow(clippy::should_implement_trait)]
85 pub fn next(&mut self) -> Option<OrderedRtpPacket> {
86 self.inner.next().map(OrderedRtpPacket::from)
87 }
88
89 pub fn take(&mut self) -> Option<OrderedRtpPacket> {
96 self.inner.take().map(OrderedRtpPacket::from)
97 }
98
99 #[inline]
101 pub fn is_empty(&self) -> bool {
102 self.inner.is_empty()
103 }
104}
105
106pub struct ReorderingMultiBuffer {
113 input_index_to_ssrc: VecDeque<Option<u32>>,
114 first_input_index: usize,
115 sources: LruCache<u32, InternalBuffer>,
116 output: VecDeque<OutputPacket>,
117 capacity: usize,
118 max_ssrcs: Option<usize>,
119}
120
121impl ReorderingMultiBuffer {
122 pub fn new(depth: usize, max_ssrcs: Option<usize>) -> Self {
134 Self {
135 input_index_to_ssrc: VecDeque::new(),
136 first_input_index: 0,
137 sources: LruCache::unbounded(),
138 output: VecDeque::with_capacity(depth.min(8)),
139 capacity: depth,
140 max_ssrcs,
141 }
142 }
143
144 pub fn estimate_index(&self, ssrc: u32, sequence_nr: u16) -> u64 {
146 self.sources
147 .peek(&ssrc)
148 .map(|source| source.estimate_index(sequence_nr))
149 .unwrap_or(sequence_nr as u64)
150 }
151
152 pub fn is_duplicate(&self, ssrc: u32, index: u64) -> bool {
154 self.sources
155 .peek(&ssrc)
156 .map(|source| source.is_duplicate(index))
157 .unwrap_or(false)
158 }
159
160 #[allow(clippy::result_large_err)]
167 pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
168 if self.input_index_to_ssrc.len() >= self.capacity {
171 return Err(ReorderingError::BufferFull(packet));
172 }
173
174 let ssrc = packet.ssrc();
175
176 let source = self
177 .sources
178 .get_or_insert_mut(ssrc, || InternalBuffer::new(self.capacity));
179
180 let input_index = self
181 .first_input_index
182 .wrapping_add(self.input_index_to_ssrc.len());
183
184 let output_index = source.push(InputPacket::new(packet, input_index))?;
185
186 self.input_index_to_ssrc.push_back(Some(ssrc));
187
188 while let Some(packet) = source.next() {
189 self.output.push_back(packet);
190 }
191
192 if let Some(max_ssrcs) = self.max_ssrcs {
195 while self.sources.len() > max_ssrcs {
196 if let Some((_, mut source)) = self.sources.pop_lru() {
197 while !source.is_empty() {
198 if let Some(packet) = source.take() {
199 self.output.push_back(packet);
200 }
201 }
202 }
203 }
204 }
205
206 Ok(output_index)
207 }
208
209 #[allow(clippy::should_implement_trait)]
215 pub fn next(&mut self) -> Option<OrderedRtpPacket> {
216 let packet = self.output.pop_front()?;
217
218 self.remove_input_index(packet.input_index);
219
220 Some(packet.into())
221 }
222
223 pub fn take(&mut self) -> Option<OrderedRtpPacket> {
233 let packet = if let Some(p) = self.output.pop_front() {
234 p
235 } else {
236 self.poll_oldest_source()?
237 };
238
239 self.remove_input_index(packet.input_index);
240
241 Some(packet.into())
242 }
243
244 #[inline]
246 pub fn is_empty(&self) -> bool {
247 self.input_index_to_ssrc.is_empty()
248 }
249
250 fn remove_input_index(&mut self, input_index: usize) {
262 let offset = input_index.wrapping_sub(self.first_input_index);
263
264 self.input_index_to_ssrc[offset] = None;
265
266 while let Some(None) = self.input_index_to_ssrc.front() {
268 self.input_index_to_ssrc.pop_front();
269
270 self.first_input_index = self.first_input_index.wrapping_add(1);
272 }
273 }
274
275 fn poll_oldest_source(&mut self) -> Option<OutputPacket> {
280 if let Some(ssrc) = self.input_index_to_ssrc.front()? {
281 if let Some(source) = self.sources.peek_mut(ssrc) {
282 if !source.is_empty() {
283 let res = source.take();
284
285 while let Some(packet) = source.next() {
288 self.output.push_back(packet);
289 }
290
291 return res;
292 }
293 }
294 }
295
296 panic!("inconsistent state")
302 }
303}
304
305struct InternalBuffer {
307 start: Option<u64>,
308 window: VecDeque<Option<OutputPacket>>,
309 capacity: usize,
310}
311
312impl InternalBuffer {
313 #[inline]
315 fn new(depth: usize) -> Self {
316 Self {
317 start: None,
318 window: VecDeque::with_capacity(depth.min(8)),
319 capacity: depth,
320 }
321 }
322
323 fn estimate_index(&self, sequence_nr: u16) -> u64 {
325 let start_index = self.start.unwrap_or(sequence_nr as u64);
326 let last_index = start_index.wrapping_add(self.window.len() as u64);
327 let last_seq_nr = last_index as u16;
328 let last_roc = last_index & !0xffff;
329
330 let new_seq_nr = sequence_nr;
331
332 let new_roc = if last_seq_nr < 0x8000 {
333 if new_seq_nr > (last_seq_nr + 0x8000) {
334 last_roc.wrapping_sub(0x10000)
335 } else {
336 last_roc
337 }
338 } else if (last_seq_nr - 0x8000) > new_seq_nr {
339 last_roc.wrapping_add(0x10000)
340 } else {
341 last_roc
342 };
343
344 new_roc | (new_seq_nr as u64)
345 }
346
347 fn is_duplicate(&self, index: u64) -> bool {
349 let start = self.start.unwrap_or(index);
350
351 let offset = index.wrapping_sub(start);
352
353 if offset > (u64::MAX >> 1) {
355 return true;
356 }
357
358 let Ok(offset) = usize::try_from(offset) else {
359 return false;
360 };
361
362 self.window
363 .get(offset)
364 .map(|entry| entry.is_some())
365 .unwrap_or(false)
366 }
367
368 #[allow(clippy::result_large_err)]
375 fn push(&mut self, packet: InputPacket) -> Result<u64, InternalError> {
376 let index = self.estimate_index(packet.sequence_number());
377
378 if self.start.is_none() {
379 self.start = Some(index);
380 }
381
382 let start = self.start.unwrap_or(index);
383
384 let offset = index.wrapping_sub(start);
385
386 if offset > (u64::MAX >> 1) {
388 return Err(InternalError::DuplicatePacket(packet));
389 }
390
391 let Ok(offset) = usize::try_from(offset) else {
392 return Err(InternalError::BufferFull(packet));
393 };
394
395 if offset < self.capacity {
396 while offset >= self.window.len() {
397 self.window.push_back(None);
398 }
399
400 let entry = &mut self.window[offset];
401
402 if entry.is_some() {
403 return Err(InternalError::DuplicatePacket(packet));
404 }
405
406 *entry = Some(OutputPacket::new(packet, index));
407
408 Ok(index)
409 } else {
410 Err(InternalError::BufferFull(packet))
411 }
412 }
413
414 fn next(&mut self) -> Option<OutputPacket> {
420 if let Some(entry) = self.window.front() {
421 if entry.is_some() {
422 return self.take();
423 }
424 }
425
426 None
427 }
428
429 fn take(&mut self) -> Option<OutputPacket> {
436 if let Some(start) = self.start.as_mut() {
437 *start = start.wrapping_add(1);
438 }
439
440 self.window.pop_front()?
441 }
442
443 #[inline]
445 fn is_empty(&self) -> bool {
446 self.window.is_empty()
447 }
448}
449
450struct InputPacket {
454 input_index: usize,
455 packet: IncomingRtpPacket,
456}
457
458impl InputPacket {
459 fn new(packet: IncomingRtpPacket, input_index: usize) -> Self {
461 Self {
462 input_index,
463 packet,
464 }
465 }
466}
467
468impl Deref for InputPacket {
469 type Target = IncomingRtpPacket;
470
471 fn deref(&self) -> &Self::Target {
472 &self.packet
473 }
474}
475
476struct OutputPacket {
481 input_index: usize,
482 output_index: u64,
483 packet: IncomingRtpPacket,
484}
485
486impl OutputPacket {
487 fn new(packet: InputPacket, output_index: u64) -> Self {
489 Self {
490 input_index: packet.input_index,
491 output_index,
492 packet: packet.packet,
493 }
494 }
495}
496
497impl From<OutputPacket> for OrderedRtpPacket {
498 fn from(packet: OutputPacket) -> Self {
499 OrderedRtpPacket::new(packet.packet, packet.output_index)
500 }
501}
502
503enum InternalError {
508 BufferFull(InputPacket),
509 DuplicatePacket(InputPacket),
510}
511
512impl From<InternalError> for ReorderingError {
513 fn from(err: InternalError) -> Self {
514 match err {
515 InternalError::BufferFull(packet) => Self::BufferFull(packet.packet),
516 InternalError::DuplicatePacket(packet) => Self::DuplicatePacket(packet.packet),
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use std::time::Instant;
524
525 use super::{ReorderingBuffer, ReorderingError, ReorderingMultiBuffer};
526
527 use crate::rtp::{IncomingRtpPacket, RtpPacket};
528
529 fn make_packet(seq: u16, ssrc: u32) -> IncomingRtpPacket {
530 let packet = RtpPacket::new().with_sequence_number(seq).with_ssrc(ssrc);
531
532 IncomingRtpPacket::new(packet, Instant::now())
533 }
534
535 #[test]
536 fn test_wrapping_index_arithmetic() {
537 let mut buffer = ReorderingBuffer::new(4);
538
539 assert!(matches!(buffer.push(make_packet(0x1000, 1)), Ok(0x1000)));
540
541 assert_eq!(buffer.estimate_index(0x0000), 0x0000_0000_0000_0000);
542 assert_eq!(buffer.estimate_index(0x2000), 0x0000_0000_0000_2000);
543 assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
544
545 assert!(matches!(
546 buffer.push(make_packet(0xf000, 1)),
547 Err(ReorderingError::DuplicatePacket(_))
548 ));
549 assert!(matches!(
550 buffer.push(make_packet(0x2000, 1)),
551 Err(ReorderingError::BufferFull(_))
552 ));
553
554 buffer = ReorderingBuffer::new(4);
555
556 assert!(matches!(buffer.push(make_packet(0xe000, 1)), Ok(0xe000)));
557
558 assert_eq!(buffer.estimate_index(0xd000), 0x0000_0000_0000_d000);
559 assert_eq!(buffer.estimate_index(0xf000), 0x0000_0000_0000_f000);
560 assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0001_1000);
561
562 assert!(matches!(
563 buffer.push(make_packet(0xd000, 1)),
564 Err(ReorderingError::DuplicatePacket(_))
565 ));
566 assert!(matches!(
567 buffer.push(make_packet(0x1000, 1)),
568 Err(ReorderingError::BufferFull(_))
569 ));
570
571 buffer = ReorderingBuffer::new(4);
572
573 buffer.inner.start = Some(u64::MAX);
574
575 assert!(matches!(buffer.push(make_packet(0xffff, 1)), Ok(u64::MAX)));
576 assert!(matches!(buffer.push(make_packet(0x0000, 1)), Ok(0)));
577
578 assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
579 assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0000_1000);
580
581 assert!(matches!(
582 buffer.push(make_packet(0xf000, 1)),
583 Err(ReorderingError::DuplicatePacket(_))
584 ));
585 assert!(matches!(
586 buffer.push(make_packet(0x1000, 1)),
587 Err(ReorderingError::BufferFull(_))
588 ));
589 }
590
591 #[test]
592 fn test_reordering_buffer() {
593 let mut buffer = ReorderingBuffer::new(5);
594
595 assert!(buffer.is_empty());
596
597 assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
598 assert!(matches!(
599 buffer.push(make_packet(0, 1)),
600 Err(ReorderingError::DuplicatePacket(_))
601 ));
602 assert!(matches!(
603 buffer.push(make_packet(1, 1)),
604 Err(ReorderingError::DuplicatePacket(_))
605 ));
606 assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
607 assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
608 assert!(matches!(
609 buffer.push(make_packet(3, 1)),
610 Err(ReorderingError::DuplicatePacket(_))
611 ));
612 assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
613 assert!(matches!(
614 buffer.push(make_packet(7, 1)),
615 Err(ReorderingError::BufferFull(_))
616 ));
617
618 assert!(!buffer.is_empty());
619
620 assert_eq!(buffer.next().unwrap().index(), 2);
621 assert_eq!(buffer.next().unwrap().index(), 3);
622 assert_eq!(buffer.next().unwrap().index(), 4);
623
624 assert!(matches!(buffer.next(), None));
625
626 assert!(!buffer.is_empty());
627
628 assert!(matches!(buffer.take(), None));
629
630 assert!(!buffer.is_empty());
631
632 assert_eq!(buffer.next().unwrap().index(), 6);
633
634 assert!(buffer.is_empty());
635 }
636
637 #[test]
638 fn test_reordering_multi_buffer() {
639 let mut buffer = ReorderingMultiBuffer::new(8, Some(2));
640
641 assert!(buffer.is_empty());
642
643 assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
644 assert!(matches!(
645 buffer.push(make_packet(0, 1)),
646 Err(ReorderingError::DuplicatePacket(_))
647 ));
648 assert!(matches!(
649 buffer.push(make_packet(1, 1)),
650 Err(ReorderingError::DuplicatePacket(_))
651 ));
652 assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
653 assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
654 assert!(matches!(
655 buffer.push(make_packet(3, 1)),
656 Err(ReorderingError::DuplicatePacket(_))
657 ));
658 assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
659 assert!(matches!(
660 buffer.push(make_packet(13, 1)),
661 Err(ReorderingError::BufferFull(_))
662 ));
663
664 assert!(matches!(buffer.push(make_packet(10, 2)), Ok(10)));
665 assert!(matches!(
666 buffer.push(make_packet(9, 2)),
667 Err(ReorderingError::DuplicatePacket(_))
668 ));
669 assert!(matches!(
670 buffer.push(make_packet(8, 2)),
671 Err(ReorderingError::DuplicatePacket(_))
672 ));
673 assert!(matches!(buffer.push(make_packet(12, 2)), Ok(12)));
674 assert!(matches!(buffer.push(make_packet(11, 2)), Ok(11)));
675 assert!(matches!(
676 buffer.push(make_packet(11, 2)),
677 Err(ReorderingError::DuplicatePacket(_))
678 ));
679 assert!(matches!(
680 buffer.push(make_packet(21, 2)),
681 Err(ReorderingError::BufferFull(_))
682 ));
683 assert!(matches!(buffer.push(make_packet(14, 2)), Ok(14)));
684 assert!(matches!(
685 buffer.push(make_packet(15, 2)),
686 Err(ReorderingError::BufferFull(_))
687 ));
688
689 assert!(!buffer.is_empty());
690
691 assert_eq!(buffer.next().unwrap().index(), 2);
692 assert_eq!(buffer.next().unwrap().index(), 3);
693 assert_eq!(buffer.next().unwrap().index(), 4);
694
695 assert_eq!(buffer.next().unwrap().index(), 10);
696 assert_eq!(buffer.next().unwrap().index(), 11);
697 assert_eq!(buffer.next().unwrap().index(), 12);
698
699 assert!(matches!(buffer.next(), None));
700
701 assert!(!buffer.is_empty());
702
703 assert!(matches!(buffer.take(), None));
704
705 assert!(!buffer.is_empty());
706
707 assert_eq!(buffer.next().unwrap().index(), 6);
708
709 assert!(matches!(buffer.next(), None));
710
711 assert!(!buffer.is_empty());
712
713 assert!(matches!(buffer.take(), None));
714
715 assert!(!buffer.is_empty());
716
717 assert_eq!(buffer.next().unwrap().index(), 14);
718
719 assert!(buffer.is_empty());
720 }
721}