use std::collections::VecDeque;
use crate::unified_pipeline::MemoryEstimate;
#[derive(Debug)]
pub struct ReorderBuffer<T> {
buffer: VecDeque<Option<(T, usize)>>,
next_seq: u64,
count: usize,
heap_bytes: u64,
can_pop: bool,
}
impl<T> ReorderBuffer<T> {
#[must_use]
pub fn new() -> Self {
Self { buffer: VecDeque::new(), next_seq: 0, count: 0, heap_bytes: 0, can_pop: false }
}
pub fn insert(&mut self, seq: u64, item: T) {
self.insert_with_size(seq, item, 0);
}
#[allow(clippy::cast_possible_truncation)]
pub fn insert_with_size(&mut self, seq: u64, item: T, heap_size: usize) {
debug_assert!(
seq >= self.next_seq,
"Sequence number {seq} is before base {}",
self.next_seq
);
let index = (seq - self.next_seq) as usize;
while self.buffer.len() <= index {
self.buffer.push_back(None);
}
debug_assert!(self.buffer[index].is_none(), "Duplicate sequence number: {seq}");
self.buffer[index] = Some((item, heap_size));
self.count += 1;
self.heap_bytes += heap_size as u64;
if index == 0 {
self.can_pop = true;
}
}
#[must_use]
pub fn try_pop_next(&mut self) -> Option<T> {
self.try_pop_next_with_size().map(|(item, _size)| item)
}
#[must_use]
pub fn try_pop_next_with_size(&mut self) -> Option<(T, usize)> {
if !self.can_pop {
return None;
}
let (item, size) = self
.buffer
.pop_front()
.expect("buffer must be non-empty when can_pop is true")
.expect("front slot must be Some when can_pop is true");
self.next_seq += 1;
self.count -= 1;
self.heap_bytes = self.heap_bytes.saturating_sub(size as u64);
self.can_pop = self.buffer.front().is_some_and(Option::is_some);
Some((item, size))
}
pub fn drain_ready(&mut self) -> DrainReady<'_, T> {
DrainReady { buffer: self }
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.count == 0
}
#[must_use]
pub fn len(&self) -> usize {
self.count
}
#[must_use]
pub fn buffer_len(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn next_seq(&self) -> u64 {
self.next_seq
}
#[must_use]
pub fn can_pop(&self) -> bool {
self.can_pop
}
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.buffer.iter().filter_map(|opt| opt.as_ref().map(|(item, _)| item))
}
#[must_use]
pub fn heap_bytes(&self) -> u64 {
self.heap_bytes
}
#[must_use]
pub fn would_accept(&self, serial: u64, memory_limit: u64) -> bool {
if memory_limit == 0 {
return true;
}
if serial == self.next_seq {
return true;
}
if !self.can_pop {
return true;
}
self.heap_bytes < memory_limit
}
#[must_use]
pub fn total_heap_size(&self) -> usize
where
T: MemoryEstimate,
{
self.buffer
.iter()
.filter_map(|opt| opt.as_ref())
.map(|(item, _size)| item.estimate_heap_size())
.sum()
}
#[cfg(test)]
pub fn set_next_seq(&mut self, seq: u64) {
self.next_seq = seq;
}
}
impl<T> Default for ReorderBuffer<T> {
fn default() -> Self {
Self::new()
}
}
pub struct DrainReady<'a, T> {
buffer: &'a mut ReorderBuffer<T>,
}
impl<T> Iterator for DrainReady<'_, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.buffer.try_pop_next()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_in_order_insertion() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert(0, 100);
buffer.insert(1, 200);
buffer.insert(2, 300);
assert_eq!(buffer.try_pop_next(), Some(100));
assert_eq!(buffer.try_pop_next(), Some(200));
assert_eq!(buffer.try_pop_next(), Some(300));
assert_eq!(buffer.try_pop_next(), None);
}
#[test]
fn test_out_of_order_insertion() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert(2, 300);
buffer.insert(1, 200);
buffer.insert(0, 100);
assert_eq!(buffer.try_pop_next(), Some(100));
assert_eq!(buffer.try_pop_next(), Some(200));
assert_eq!(buffer.try_pop_next(), Some(300));
assert_eq!(buffer.try_pop_next(), None);
}
#[test]
fn test_gap_blocks_progress() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert(0, 100);
buffer.insert(2, 300);
assert_eq!(buffer.try_pop_next(), Some(100));
assert_eq!(buffer.try_pop_next(), None);
buffer.insert(1, 200);
assert_eq!(buffer.try_pop_next(), Some(200));
assert_eq!(buffer.try_pop_next(), Some(300));
assert_eq!(buffer.try_pop_next(), None);
}
#[test]
fn test_drain_ready() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert(0, 100);
buffer.insert(1, 200);
buffer.insert(3, 400);
let ready: Vec<_> = buffer.drain_ready().collect();
assert_eq!(ready, vec![100, 200]);
assert!(!buffer.is_empty());
assert_eq!(buffer.next_seq(), 2);
buffer.insert(2, 300);
let more: Vec<_> = buffer.drain_ready().collect();
assert_eq!(more, vec![300, 400]);
assert!(buffer.is_empty());
}
#[test]
fn test_sparse_insertion() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert(0, 100);
buffer.insert(5, 600);
buffer.insert(2, 300);
assert_eq!(buffer.try_pop_next(), Some(100));
assert_eq!(buffer.try_pop_next(), None);
buffer.insert(1, 200);
assert_eq!(buffer.try_pop_next(), Some(200));
assert_eq!(buffer.try_pop_next(), Some(300));
assert_eq!(buffer.try_pop_next(), None);
buffer.insert(3, 400);
buffer.insert(4, 500);
let rest: Vec<_> = buffer.drain_ready().collect();
assert_eq!(rest, vec![400, 500, 600]);
}
#[test]
fn test_large_sequence_numbers() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
let start = 1_000_000u64;
buffer.set_next_seq(start);
buffer.insert(start, 100);
buffer.insert(start + 1, 200);
assert_eq!(buffer.try_pop_next(), Some(100));
assert_eq!(buffer.try_pop_next(), Some(200));
}
#[test]
fn test_memory_tracking() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert_with_size(0, 100, 1000);
buffer.insert_with_size(1, 200, 2000);
buffer.insert_with_size(2, 300, 3000);
assert_eq!(buffer.heap_bytes(), 6000);
assert_eq!(buffer.len(), 3);
let (val, size) = buffer.try_pop_next_with_size().expect("queue should have next element");
assert_eq!(val, 100);
assert_eq!(size, 1000);
assert_eq!(buffer.heap_bytes(), 5000);
let (val, size) = buffer.try_pop_next_with_size().expect("queue should have next element");
assert_eq!(val, 200);
assert_eq!(size, 2000);
assert_eq!(buffer.heap_bytes(), 3000);
}
#[test]
fn test_can_pop_tracking() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
assert!(!buffer.can_pop());
buffer.insert(1, 200);
assert!(!buffer.can_pop());
buffer.insert(0, 100);
assert!(buffer.can_pop());
assert_eq!(buffer.try_pop_next(), Some(100));
assert!(buffer.can_pop());
assert_eq!(buffer.try_pop_next(), Some(200));
assert!(!buffer.can_pop());
}
#[test]
fn test_would_accept_no_limit() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert_with_size(0, 100, 1_000_000_000);
assert!(buffer.would_accept(1, 0));
assert!(buffer.would_accept(999, 0));
}
#[test]
fn test_would_accept_needed_serial() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert_with_size(1, 200, 1_000_000_000);
assert!(buffer.would_accept(0, 100)); }
#[test]
fn test_would_accept_stuck() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert_with_size(2, 300, 1_000_000_000);
assert!(!buffer.can_pop());
assert!(buffer.would_accept(3, 100)); assert!(buffer.would_accept(10, 100)); }
#[test]
fn test_would_accept_over_limit() {
let mut buffer: ReorderBuffer<i32> = ReorderBuffer::new();
buffer.insert_with_size(0, 100, 1000);
assert!(buffer.can_pop());
assert_eq!(buffer.heap_bytes(), 1000);
assert!(!buffer.would_accept(1, 500));
assert!(buffer.would_accept(1, 2000));
}
}