#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
use crossbeam_channel::{Receiver, Sender, bounded};
use std::collections::BTreeMap;
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct SequencedRecord<T> {
pub sequence_id: u64,
pub data: T,
}
impl<T> SequencedRecord<T> {
#[inline]
#[must_use]
pub fn new(sequence_id: u64, data: T) -> Self {
Self { sequence_id, data }
}
}
#[derive(Debug)]
pub struct SequenceRing<T> {
receiver: Receiver<SequencedRecord<T>>,
sender: Sender<SequencedRecord<T>>,
reorder_buffer: BTreeMap<u64, T>,
next_sequence_id: u64,
max_window_size: usize,
channel_capacity: usize,
}
impl<T> SequenceRing<T> {
#[inline]
#[must_use]
pub fn new(channel_capacity: usize, max_window_size: usize) -> Self {
let (sender, receiver) = bounded(channel_capacity);
Self {
receiver,
sender,
reorder_buffer: BTreeMap::new(),
next_sequence_id: 1,
max_window_size,
channel_capacity,
}
}
#[inline]
#[must_use]
pub fn sender(&self) -> Sender<SequencedRecord<T>> {
self.sender.clone()
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::RecvError> {
loop {
if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
self.next_sequence_id += 1;
debug!(
"Emitting record {} from reorder buffer",
self.next_sequence_id - 1
);
return Ok(Some(record));
}
if let Ok(sequenced_record) = self.receiver.recv() {
let SequencedRecord { sequence_id, data } = sequenced_record;
match sequence_id.cmp(&self.next_sequence_id) {
std::cmp::Ordering::Equal => {
self.next_sequence_id += 1;
debug!("Emitting record {} directly", sequence_id);
return Ok(Some(data));
}
std::cmp::Ordering::Greater => {
debug!(
"Buffering out-of-order record {} (expecting {})",
sequence_id, self.next_sequence_id
);
self.reorder_buffer.insert(sequence_id, data);
if self.reorder_buffer.len() > self.max_window_size {
warn!(
"Reorder buffer size ({}) exceeds maximum ({}), potential memory issue",
self.reorder_buffer.len(),
self.max_window_size
);
}
}
std::cmp::Ordering::Less => {
warn!(
"Received past record {} (expecting {}), ignoring",
sequence_id, self.next_sequence_id
);
}
}
} else {
if let Some((_, record)) = self.reorder_buffer.pop_first() {
debug!("Emitting remaining buffered record during shutdown");
return Ok(Some(record));
}
debug!("Channel closed, no more records");
return Ok(None);
}
}
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn try_recv_ordered(&mut self) -> Result<Option<T>, crossbeam_channel::TryRecvError> {
if let Some(record) = self.reorder_buffer.remove(&self.next_sequence_id) {
self.next_sequence_id += 1;
return Ok(Some(record));
}
match self.receiver.try_recv() {
Ok(sequenced_record) => {
let SequencedRecord { sequence_id, data } = sequenced_record;
match sequence_id.cmp(&self.next_sequence_id) {
std::cmp::Ordering::Equal => {
self.next_sequence_id += 1;
Ok(Some(data))
}
std::cmp::Ordering::Greater => {
self.reorder_buffer.insert(sequence_id, data);
Err(crossbeam_channel::TryRecvError::Empty)
}
std::cmp::Ordering::Less => {
warn!("Received past record {}, ignoring", sequence_id);
Err(crossbeam_channel::TryRecvError::Empty)
}
}
}
Err(e) => Err(e),
}
}
#[inline]
#[must_use]
pub fn stats(&self) -> SequenceRingStats {
SequenceRingStats {
next_sequence_id: self.next_sequence_id,
reorder_buffer_size: self.reorder_buffer.len(),
max_window_size: self.max_window_size,
channel_capacity: self.channel_capacity,
}
}
}
#[derive(Debug, Clone)]
pub struct SequenceRingStats {
pub next_sequence_id: u64,
pub reorder_buffer_size: usize,
pub max_window_size: usize,
pub channel_capacity: usize,
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use crossbeam_channel::TryRecvError;
#[test]
fn sequenced_record_creation() {
let record = SequencedRecord::new(42, "test data");
assert_eq!(record.sequence_id, 42);
assert_eq!(record.data, "test data");
}
#[test]
fn sequenced_record_clone() {
let record = SequencedRecord::new(1, vec![10, 20, 30]);
let cloned = record.clone();
assert_eq!(cloned.sequence_id, 1);
assert_eq!(cloned.data, vec![10, 20, 30]);
}
#[test]
fn sequenced_record_debug_format() {
let record = SequencedRecord::new(7, "hello");
let debug = format!("{record:?}");
assert!(debug.contains('7'));
assert!(debug.contains("hello"));
}
#[test]
fn recv_ordered_emits_input_order() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(2, "second")).unwrap();
sender.send(SequencedRecord::new(1, "first")).unwrap();
sender.send(SequencedRecord::new(3, "third")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("first"));
assert_eq!(ring.recv_ordered().unwrap(), Some("second"));
assert_eq!(ring.recv_ordered().unwrap(), Some("third"));
}
#[test]
fn recv_ordered_already_in_order() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "a")).unwrap();
sender.send(SequencedRecord::new(2, "b")).unwrap();
sender.send(SequencedRecord::new(3, "c")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
}
#[test]
fn recv_ordered_reverse_order() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(3, "c")).unwrap();
sender.send(SequencedRecord::new(2, "b")).unwrap();
sender.send(SequencedRecord::new(1, "a")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
}
#[test]
fn recv_ordered_returns_none_on_channel_close() {
let ring = SequenceRing::<&str>::new(10, 5);
drop(ring.sender());
let mut ring2 = SequenceRing::new(10, 5);
let sender = ring2.sender();
sender.send(SequencedRecord::new(1, "only")).unwrap();
drop(sender);
assert_eq!(ring2.recv_ordered().unwrap(), Some("only"));
}
#[test]
fn try_recv_empty_reports_empty() {
let mut ring = SequenceRing::<&str>::new(10, 5);
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
}
#[test]
fn try_recv_returns_in_order_record() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "first")).unwrap();
assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
}
#[test]
fn try_recv_buffers_future_record() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(2, "second")).unwrap();
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
assert_eq!(ring.stats().reorder_buffer_size, 1);
sender.send(SequencedRecord::new(1, "first")).unwrap();
assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
assert_eq!(ring.try_recv_ordered().unwrap(), Some("second"));
}
#[test]
fn try_recv_ignores_past_record() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "first")).unwrap();
assert_eq!(ring.try_recv_ordered().unwrap(), Some("first"));
sender.send(SequencedRecord::new(1, "duplicate")).unwrap();
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
}
#[test]
fn sender_respects_channel_capacity() {
let ring = SequenceRing::new(2, 1);
let sender = ring.sender();
sender.try_send(SequencedRecord::new(1, "first")).unwrap();
sender.try_send(SequencedRecord::new(2, "second")).unwrap();
let full = sender.try_send(SequencedRecord::new(3, "third"));
assert!(full.is_err());
}
#[test]
fn stats_reflect_progress() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
for i in 1..=5 {
sender
.send(SequencedRecord::new(i, format!("record_{i}")))
.unwrap();
}
let before = ring.stats();
assert_eq!(before.next_sequence_id, 1);
assert_eq!(before.reorder_buffer_size, 0);
assert_eq!(before.channel_capacity, 10);
assert_eq!(before.max_window_size, 5);
for _ in 1..=5 {
assert!(ring.recv_ordered().unwrap().is_some());
}
let after = ring.stats();
assert_eq!(after.next_sequence_id, 6);
assert_eq!(after.reorder_buffer_size, 0);
}
#[test]
fn stats_initial_state() {
let ring = SequenceRing::<String>::new(100, 50);
let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 1);
assert_eq!(stats.reorder_buffer_size, 0);
assert_eq!(stats.max_window_size, 50);
assert_eq!(stats.channel_capacity, 100);
}
#[test]
fn stats_shows_buffered_count() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(3, "three")).unwrap();
sender.send(SequencedRecord::new(5, "five")).unwrap();
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
let stats = ring.stats();
assert_eq!(stats.reorder_buffer_size, 2);
assert_eq!(stats.next_sequence_id, 1);
}
#[test]
fn multiple_senders_work() {
let mut ring = SequenceRing::new(10, 5);
let sender1 = ring.sender();
let sender2 = ring.sender();
sender1.send(SequencedRecord::new(1, "from_s1")).unwrap();
sender2.send(SequencedRecord::new(2, "from_s2")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("from_s1"));
assert_eq!(ring.recv_ordered().unwrap(), Some("from_s2"));
}
#[test]
fn large_gap_reordering() {
let mut ring = SequenceRing::new(20, 20);
let sender = ring.sender();
for i in (1..=10).rev() {
sender.send(SequencedRecord::new(i, i)).unwrap();
}
for expected in 1..=10u64 {
assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
}
}
#[test]
fn sequence_ring_stats_clone_and_debug() {
let ring = SequenceRing::<()>::new(8, 4);
let stats = ring.stats();
let cloned = stats.clone();
assert_eq!(cloned.channel_capacity, 8);
let debug = format!("{stats:?}");
assert!(debug.contains("next_sequence_id"));
}
#[test]
fn single_element_send_recv() {
let mut ring = SequenceRing::new(1, 1);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, 42)).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some(42));
}
#[test]
fn capacity_power_of_two_boundary_2() {
let mut ring = SequenceRing::new(2, 2);
let sender = ring.sender();
sender.send(SequencedRecord::new(2, "b")).unwrap();
sender.send(SequencedRecord::new(1, "a")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
}
#[test]
fn capacity_power_of_two_boundary_4() {
let mut ring = SequenceRing::new(4, 4);
let sender = ring.sender();
for i in (1..=4).rev() {
sender.send(SequencedRecord::new(i, i)).unwrap();
}
for expected in 1..=4u64 {
assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
}
}
#[test]
fn sequence_ids_larger_than_capacity() {
let mut ring = SequenceRing::new(4, 4);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "a")).unwrap();
sender.send(SequencedRecord::new(2, "b")).unwrap();
sender.send(SequencedRecord::new(3, "c")).unwrap();
sender.send(SequencedRecord::new(4, "d")).unwrap();
for expected in ["a", "b", "c", "d"] {
assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
}
sender.send(SequencedRecord::new(5, "e")).unwrap();
sender.send(SequencedRecord::new(6, "f")).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some("e"));
assert_eq!(ring.recv_ordered().unwrap(), Some("f"));
assert_eq!(ring.stats().next_sequence_id, 7);
}
#[test]
fn concurrent_senders_from_threads() {
use std::thread;
let mut ring = SequenceRing::new(100, 50);
let sender1 = ring.sender();
let sender2 = ring.sender();
let h1 = thread::spawn(move || {
for i in (1..=10).step_by(2) {
sender1.send(SequencedRecord::new(i, i)).unwrap();
}
});
let h2 = thread::spawn(move || {
for i in (2..=10).step_by(2) {
sender2.send(SequencedRecord::new(i, i)).unwrap();
}
});
h1.join().unwrap();
h2.join().unwrap();
for expected in 1..=10u64 {
assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
}
}
#[test]
fn empty_ring_try_recv_is_empty() {
let mut ring = SequenceRing::<i32>::new(8, 4);
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
assert_eq!(ring.stats().next_sequence_id, 1);
assert_eq!(ring.stats().reorder_buffer_size, 0);
}
#[test]
fn channel_close_drains_buffered_records() {
let mut ring = SequenceRing::new(10, 10);
let sender = ring.sender();
sender.send(SequencedRecord::new(3, "c")).unwrap();
sender.send(SequencedRecord::new(2, "b")).unwrap();
sender.send(SequencedRecord::new(1, "a")).unwrap();
drop(sender);
assert_eq!(ring.recv_ordered().unwrap(), Some("a"));
assert_eq!(ring.recv_ordered().unwrap(), Some("b"));
assert_eq!(ring.recv_ordered().unwrap(), Some("c"));
}
#[test]
fn try_recv_disconnected_after_drain() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "only")).unwrap();
drop(sender);
assert_eq!(ring.try_recv_ordered().unwrap(), Some("only"));
}
#[test]
fn interleaved_try_recv_and_send() {
let mut ring = SequenceRing::new(10, 5);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, "a")).unwrap();
assert_eq!(ring.try_recv_ordered().unwrap(), Some("a"));
sender.send(SequencedRecord::new(3, "c")).unwrap();
assert!(matches!(ring.try_recv_ordered(), Err(TryRecvError::Empty)));
sender.send(SequencedRecord::new(2, "b")).unwrap();
assert_eq!(ring.try_recv_ordered().unwrap(), Some("b"));
assert_eq!(ring.try_recv_ordered().unwrap(), Some("c"));
}
#[test]
fn sequenced_record_with_complex_data() {
let record = SequencedRecord::new(99, vec![vec![1, 2], vec![3, 4]]);
assert_eq!(record.sequence_id, 99);
assert_eq!(record.data.len(), 2);
assert_eq!(record.data[0], vec![1, 2]);
}
#[test]
fn stats_after_partial_drain() {
let mut ring = SequenceRing::new(10, 10);
let sender = ring.sender();
sender.send(SequencedRecord::new(1, 1)).unwrap();
sender.send(SequencedRecord::new(2, 2)).unwrap();
sender.send(SequencedRecord::new(3, 3)).unwrap();
assert_eq!(ring.recv_ordered().unwrap(), Some(1));
let stats = ring.stats();
assert_eq!(stats.next_sequence_id, 2);
}
#[test]
fn large_window_reorder() {
let mut ring = SequenceRing::new(64, 64);
let sender = ring.sender();
for i in (1..=32).rev() {
sender.send(SequencedRecord::new(i, i)).unwrap();
}
for expected in 1..=32u64 {
assert_eq!(ring.recv_ordered().unwrap(), Some(expected));
}
}
}