1#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
2use crossbeam_channel::{Receiver, Sender, bounded};
9use std::collections::BTreeMap;
10use tracing::{debug, warn};
11
12#[derive(Debug, Clone)]
31pub struct SequencedRecord<T> {
32 pub sequence_id: u64,
37
38 pub data: T,
42}
43
44impl<T> SequencedRecord<T> {
45 #[inline]
62 #[must_use]
63 pub fn new(sequence_id: u64, data: T) -> Self {
64 Self { sequence_id, data }
65 }
66}
67
68#[derive(Debug)]
111pub struct SequenceRing<T> {
112 receiver: Receiver<SequencedRecord<T>>,
114
115 sender: Sender<SequencedRecord<T>>,
117
118 reorder_buffer: BTreeMap<u64, T>,
122
123 next_sequence_id: u64,
127
128 max_window_size: usize,
132
133 channel_capacity: usize,
135}
136
137impl<T> SequenceRing<T> {
138 #[inline]
162 #[must_use]
163 pub fn new(channel_capacity: usize, max_window_size: usize) -> Self {
164 let (sender, receiver) = bounded(channel_capacity);
165
166 Self {
167 receiver,
168 sender,
169 reorder_buffer: BTreeMap::new(),
170 next_sequence_id: 1,
171 max_window_size,
172 channel_capacity,
173 }
174 }
175
176 #[inline]
201 #[must_use]
202 pub fn sender(&self) -> Sender<SequencedRecord<T>> {
203 self.sender.clone()
204 }
205
206 #[inline]
212 #[must_use = "Handle the Result or propagate the error"]
213 pub fn recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::RecvError> {
214 loop {
215 if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
217 self.next_sequence_id += 1;
218 debug!(
219 "Emitting record {} from reorder buffer",
220 self.next_sequence_id - 1
221 );
222 return Ok(Some(record));
223 }
224
225 if let Ok(sequenced_record) = self.receiver.recv() {
227 let SequencedRecord { sequence_id, data } = sequenced_record;
228
229 match sequence_id.cmp(&self.next_sequence_id) {
230 std::cmp::Ordering::Equal => {
231 self.next_sequence_id += 1;
233 debug!("Emitting record {} directly", sequence_id);
234 return Ok(Some(data));
235 }
236 std::cmp::Ordering::Greater => {
237 debug!(
239 "Buffering out-of-order record {} (expecting {})",
240 sequence_id, self.next_sequence_id
241 );
242 self.reorder_buffer.insert(sequence_id, data);
243
244 if self.reorder_buffer.len() > self.max_window_size {
246 warn!(
247 "Reorder buffer size ({}) exceeds maximum ({}), potential memory issue",
248 self.reorder_buffer.len(),
249 self.max_window_size
250 );
251 }
252 }
253 std::cmp::Ordering::Less => {
254 warn!(
256 "Received past record {} (expecting {}), ignoring",
257 sequence_id, self.next_sequence_id
258 );
259 }
260 }
261 } else {
262 if let Some((_, record)) = self.reorder_buffer.pop_first() {
264 debug!("Emitting remaining buffered record during shutdown");
265 return Ok(Some(record));
266 }
267 debug!("Channel closed, no more records");
268 return Ok(None);
269 }
270 }
271 }
272
273 #[inline]
278 #[must_use = "Handle the Result or propagate the error"]
279 pub fn try_recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::TryRecvError> {
280 if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
282 self.next_sequence_id += 1;
283 return Ok(Some(record));
284 }
285
286 match self.receiver.try_recv() {
288 Ok(sequenced_record) => {
289 let SequencedRecord { sequence_id, data } = sequenced_record;
290
291 match sequence_id.cmp(&self.next_sequence_id) {
292 std::cmp::Ordering::Equal => {
293 self.next_sequence_id += 1;
294 Ok(Some(data))
295 }
296 std::cmp::Ordering::Greater => {
297 self.reorder_buffer.insert(sequence_id, data);
299 Err(crossbeam_channel::TryRecvError::Empty)
300 }
301 std::cmp::Ordering::Less => {
302 warn!("Received past record {}, ignoring", sequence_id);
304 Err(crossbeam_channel::TryRecvError::Empty)
305 }
306 }
307 }
308 Err(e) => Err(e),
309 }
310 }
311
312 #[inline]
337 #[must_use]
338 pub fn stats(&self) -> SequenceRingStats {
339 SequenceRingStats {
340 next_sequence_id: self.next_sequence_id,
341 reorder_buffer_size: self.reorder_buffer.len(),
342 max_window_size: self.max_window_size,
343 channel_capacity: self.channel_capacity,
344 }
345 }
346}
347
348#[derive(Debug, Clone)]
352pub struct SequenceRingStats {
353 pub next_sequence_id: u64,
357
358 pub reorder_buffer_size: usize,
363
364 pub max_window_size: usize,
368
369 pub channel_capacity: usize,
373}
374
375#[cfg(test)]
376#[allow(clippy::expect_used, clippy::unwrap_used)]
377mod tests {
378 use super::*;
379 use crossbeam_channel::TryRecvError;
380
381 #[test]
382 fn sequenced_record_creation() {
383 let record = SequencedRecord::new(42, "test data");
384 assert_eq!(record.sequence_id, 42);
385 assert_eq!(record.data, "test data");
386 }
387
388 #[test]
389 fn sequenced_record_clone() {
390 let record = SequencedRecord::new(1, vec![10, 20, 30]);
391 let cloned = record.clone();
392 assert_eq!(cloned.sequence_id, 1);
393 assert_eq!(cloned.data, vec![10, 20, 30]);
394 }
395
396 #[test]
397 fn sequenced_record_debug_format() {
398 let record = SequencedRecord::new(7, "hello");
399 let debug = format!("{record:?}");
400 assert!(debug.contains('7'));
401 assert!(debug.contains("hello"));
402 }
403
404 #[test]
405 fn recv_ordered_emits_input_order() {
406 let mut ring = SequenceRing::new(10, 5);
407 let sender = ring.sender();
408
409 sender.send(SequencedRecord::new(2, "second")).unwrap();
410 sender.send(SequencedRecord::new(1, "first")).unwrap();
411 sender.send(SequencedRecord::new(3, "third")).unwrap();
412
413 assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
414 assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
415 assert_eq!(ring.recv_ordered().unwrap(), Some("third"));
416 }
417
418 #[test]
419 fn recv_ordered_already_in_order() {
420 let mut ring = SequenceRing::new(10, 5);
421 let sender = ring.sender();
422
423 sender.send(SequencedRecord::new(1, "a")).unwrap();
424 sender.send(SequencedRecord::new(2, "b")).unwrap();
425 sender.send(SequencedRecord::new(3, "c")).unwrap();
426
427 assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
428 assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
429 assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
430 }
431
432 #[test]
433 fn recv_ordered_reverse_order() {
434 let mut ring = SequenceRing::new(10, 5);
435 let sender = ring.sender();
436
437 sender.send(SequencedRecord::new(3, "c")).unwrap();
438 sender.send(SequencedRecord::new(2, "b")).unwrap();
439 sender.send(SequencedRecord::new(1, "a")).unwrap();
440
441 assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
442 assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
443 assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
444 }
445
446 #[test]
447 fn recv_ordered_returns_none_on_channel_close() {
448 let ring = SequenceRing::<&str>::new(10, 5);
449 drop(ring.sender());
451 let mut ring2 = SequenceRing::new(10, 5);
456 let sender = ring2.sender();
457 sender.send(SequencedRecord::new(1, "only")).unwrap();
458 drop(sender);
459 assert_eq!(ring2.recv_ordered().unwrap(), Some("only"));
462 }
463
464 #[test]
465 fn try_recv_empty_reports_empty() {
466 let mut ring = SequenceRing::<&str>::new(10, 5);
467 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
468 }
469
470 #[test]
471 fn try_recv_returns_in_order_record() {
472 let mut ring = SequenceRing::new(10, 5);
473 let sender = ring.sender();
474 sender.send(SequencedRecord::new(1, "first")).unwrap();
475 assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
476 }
477
478 #[test]
479 fn try_recv_buffers_future_record() {
480 let mut ring = SequenceRing::new(10, 5);
481 let sender = ring.sender();
482
483 sender.send(SequencedRecord::new(2, "second")).unwrap();
485 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
487 assert_eq!(ring.stats().reorder_buffer_size, 1);
489
490 sender.send(SequencedRecord::new(1, "first")).unwrap();
492 assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
493 assert_eq!(ring.try_recv_ordered().unwrap(), Some("second"));
495 }
496
497 #[test]
498 fn try_recv_ignores_past_record() {
499 let mut ring = SequenceRing::new(10, 5);
500 let sender = ring.sender();
501
502 sender.send(SequencedRecord::new(1, "first")).unwrap();
504 assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
505
506 sender.send(SequencedRecord::new(1, "duplicate")).unwrap();
508 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
509 }
510
511 #[test]
512 fn sender_respects_channel_capacity() {
513 let ring = SequenceRing::new(2, 1);
514 let sender = ring.sender();
515
516 sender.try_send(SequencedRecord::new(1, "first")).unwrap();
517 sender.try_send(SequencedRecord::new(2, "second")).unwrap();
518
519 let full = sender.try_send(SequencedRecord::new(3, "third"));
520 assert!(full.is_err());
521 }
522
523 #[test]
524 fn stats_reflect_progress() {
525 let mut ring = SequenceRing::new(10, 5);
526 let sender = ring.sender();
527
528 for i in 1..=5 {
529 sender
530 .send(SequencedRecord::new(i, format!("record_{i}")))
531 .unwrap();
532 }
533
534 let before = ring.stats();
535 assert_eq!(before.next_sequence_id, 1);
536 assert_eq!(before.reorder_buffer_size, 0);
537 assert_eq!(before.channel_capacity, 10);
538 assert_eq!(before.max_window_size, 5);
539
540 for _ in 1..=5 {
541 assert!(ring.recv_ordered().unwrap().is_some());
542 }
543
544 let after = ring.stats();
545 assert_eq!(after.next_sequence_id, 6);
546 assert_eq!(after.reorder_buffer_size, 0);
547 }
548
549 #[test]
550 fn stats_initial_state() {
551 let ring = SequenceRing::<String>::new(100, 50);
552 let stats = ring.stats();
553 assert_eq!(stats.next_sequence_id, 1);
554 assert_eq!(stats.reorder_buffer_size, 0);
555 assert_eq!(stats.max_window_size, 50);
556 assert_eq!(stats.channel_capacity, 100);
557 }
558
559 #[test]
560 fn stats_shows_buffered_count() {
561 let mut ring = SequenceRing::new(10, 5);
562 let sender = ring.sender();
563
564 sender.send(SequencedRecord::new(3, "three")).unwrap();
566 sender.send(SequencedRecord::new(5, "five")).unwrap();
567 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
569 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
570
571 let stats = ring.stats();
572 assert_eq!(stats.reorder_buffer_size, 2);
573 assert_eq!(stats.next_sequence_id, 1);
574 }
575
576 #[test]
577 fn multiple_senders_work() {
578 let mut ring = SequenceRing::new(10, 5);
579 let sender1 = ring.sender();
580 let sender2 = ring.sender();
581
582 sender1.send(SequencedRecord::new(1, "from_s1")).unwrap();
583 sender2.send(SequencedRecord::new(2, "from_s2")).unwrap();
584
585 assert_eq!(ring.recv_ordered().unwrap(), Some("from_s1"));
586 assert_eq!(ring.recv_ordered().unwrap(), Some("from_s2"));
587 }
588
589 #[test]
590 fn large_gap_reordering() {
591 let mut ring = SequenceRing::new(20, 20);
592 let sender = ring.sender();
593
594 for i in (1..=10).rev() {
596 sender.send(SequencedRecord::new(i, i)).unwrap();
597 }
598
599 for expected in 1..=10u64 {
601 assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
602 }
603 }
604
605 #[test]
606 fn sequence_ring_stats_clone_and_debug() {
607 let ring = SequenceRing::<()>::new(8, 4);
608 let stats = ring.stats();
609 let cloned = stats.clone();
610 assert_eq!(cloned.channel_capacity, 8);
611 let debug = format!("{stats:?}");
612 assert!(debug.contains("next_sequence_id"));
613 }
614
615 #[test]
618 fn single_element_send_recv() {
619 let mut ring = SequenceRing::new(1, 1);
620 let sender = ring.sender();
621 sender.send(SequencedRecord::new(1, 42)).unwrap();
622 assert_eq!(ring.recv_ordered().unwrap(), Some(42));
623 }
624
625 #[test]
626 fn capacity_power_of_two_boundary_2() {
627 let mut ring = SequenceRing::new(2, 2);
628 let sender = ring.sender();
629 sender.send(SequencedRecord::new(2, "b")).unwrap();
630 sender.send(SequencedRecord::new(1, "a")).unwrap();
631 assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
632 assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
633 }
634
635 #[test]
636 fn capacity_power_of_two_boundary_4() {
637 let mut ring = SequenceRing::new(4, 4);
638 let sender = ring.sender();
639 for i in (1..=4).rev() {
640 sender.send(SequencedRecord::new(i, i)).unwrap();
641 }
642 for expected in 1..=4u64 {
643 assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
644 }
645 }
646
647 #[test]
648 fn sequence_ids_larger_than_capacity() {
649 let mut ring = SequenceRing::new(4, 4);
650 let sender = ring.sender();
651 sender.send(SequencedRecord::new(1, "a")).unwrap();
653 sender.send(SequencedRecord::new(2, "b")).unwrap();
654 sender.send(SequencedRecord::new(3, "c")).unwrap();
655 sender.send(SequencedRecord::new(4, "d")).unwrap();
656 for expected in ["a", "b", "c", "d"] {
657 assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
658 }
659 sender.send(SequencedRecord::new(5, "e")).unwrap();
661 sender.send(SequencedRecord::new(6, "f")).unwrap();
662 assert_eq!(ring.recv_ordered().unwrap(), Some("e"));
663 assert_eq!(ring.recv_ordered().unwrap(), Some("f"));
664 assert_eq!(ring.stats().next_sequence_id, 7);
665 }
666
667 #[test]
668 fn concurrent_senders_from_threads() {
669 use std::thread;
670
671 let mut ring = SequenceRing::new(100, 50);
672 let sender1 = ring.sender();
673 let sender2 = ring.sender();
674
675 let h1 = thread::spawn(move || {
676 for i in (1..=10).step_by(2) {
677 sender1.send(SequencedRecord::new(i, i)).unwrap();
678 }
679 });
680 let h2 = thread::spawn(move || {
681 for i in (2..=10).step_by(2) {
682 sender2.send(SequencedRecord::new(i, i)).unwrap();
683 }
684 });
685
686 h1.join().unwrap();
687 h2.join().unwrap();
688
689 for expected in 1..=10u64 {
690 assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
691 }
692 }
693
694 #[test]
695 fn empty_ring_try_recv_is_empty() {
696 let mut ring = SequenceRing::<i32>::new(8, 4);
697 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
698 assert_eq!(ring.stats().next_sequence_id, 1);
699 assert_eq!(ring.stats().reorder_buffer_size, 0);
700 }
701
702 #[test]
703 fn channel_close_drains_buffered_records() {
704 let mut ring = SequenceRing::new(10, 10);
705 let sender = ring.sender();
706 sender.send(SequencedRecord::new(3, "c")).unwrap();
708 sender.send(SequencedRecord::new(2, "b")).unwrap();
709 sender.send(SequencedRecord::new(1, "a")).unwrap();
710 drop(sender);
711 assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
713 assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
714 assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
715 }
716
717 #[test]
718 fn try_recv_disconnected_after_drain() {
719 let mut ring = SequenceRing::new(10, 5);
720 let sender = ring.sender();
721 sender.send(SequencedRecord::new(1, "only")).unwrap();
722 drop(sender);
723 assert_eq!(ring.try_recv_ordered().unwrap(), Some("only"));
725 }
727
728 #[test]
729 fn interleaved_try_recv_and_send() {
730 let mut ring = SequenceRing::new(10, 5);
731 let sender = ring.sender();
732
733 sender.send(SequencedRecord::new(1, "a")).unwrap();
734 assert_eq!(ring.try_recv_ordered().unwrap(), Some("a"));
735
736 sender.send(SequencedRecord::new(3, "c")).unwrap();
737 assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
738
739 sender.send(SequencedRecord::new(2, "b")).unwrap();
740 assert_eq!(ring.try_recv_ordered().unwrap(), Some("b"));
741 assert_eq!(ring.try_recv_ordered().unwrap(), Some("c"));
742 }
743
744 #[test]
745 fn sequenced_record_with_complex_data() {
746 let record = SequencedRecord::new(99, vec![vec![1, 2], vec![3, 4]]);
747 assert_eq!(record.sequence_id, 99);
748 assert_eq!(record.data.len(), 2);
749 assert_eq!(record.data[0], vec![1, 2]);
750 }
751
752 #[test]
753 fn stats_after_partial_drain() {
754 let mut ring = SequenceRing::new(10, 10);
755 let sender = ring.sender();
756 sender.send(SequencedRecord::new(1, 1)).unwrap();
757 sender.send(SequencedRecord::new(2, 2)).unwrap();
758 sender.send(SequencedRecord::new(3, 3)).unwrap();
759
760 assert_eq!(ring.recv_ordered().unwrap(), Some(1));
761 let stats = ring.stats();
762 assert_eq!(stats.next_sequence_id, 2);
763 }
764
765 #[test]
766 fn large_window_reorder() {
767 let mut ring = SequenceRing::new(64, 64);
768 let sender = ring.sender();
769
770 for i in (1..=32).rev() {
772 sender.send(SequencedRecord::new(i, i)).unwrap();
773 }
774
775 for expected in 1..=32u64 {
776 assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
777 }
778 }
779}