use std::collections::VecDeque;
pub(crate) struct ClientResponderQueue<T> {
responders: VecDeque<(u64, T)>,
}
impl<T> ClientResponderQueue<T> {
pub(crate) fn new() -> Self {
Self {
responders: VecDeque::new(),
}
}
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
responders: VecDeque::with_capacity(capacity),
}
}
pub(crate) fn push(&mut self, index: u64, responder: T) {
#[cfg(debug_assertions)]
if let Some((last_index, _)) = self.responders.back() {
debug_assert!(
index > *last_index,
"Responder indices must be monotonically increasing: tried to push {} after {}",
index,
last_index
);
}
self.responders.push_back((index, responder));
}
#[allow(dead_code)]
pub(crate) fn extend(&mut self, iter: impl IntoIterator<Item = (u64, T)>) {
for (index, responder) in iter {
self.push(index, responder);
}
}
pub(crate) fn drain_upto(&mut self, last_index: u64) -> Vec<(u64, T)> {
let end_pos = self.responders.partition_point(|(index, _)| *index <= last_index);
self.responders.drain(0..end_pos).collect()
}
pub(crate) fn drain_from(&mut self, from_index: u64) -> impl Iterator<Item = (u64, T)> + '_ {
let start_pos =
self.responders.binary_search_by_key(&from_index, |(index, _)| *index).unwrap_or_else(|pos| pos);
self.responders.drain(start_pos..)
}
#[allow(dead_code)]
pub(crate) fn first_index(&self) -> Option<u64> {
self.responders.front().map(|(index, _)| *index)
}
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.responders.len()
}
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.responders.is_empty()
}
}
impl<T> Default for ClientResponderQueue<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
type TestQueue = ClientResponderQueue<u64>;
#[test]
fn test_new() {
let queue = TestQueue::new();
assert!(queue.is_empty());
assert_eq!(queue.first_index(), None);
}
#[test]
fn test_push() {
let mut queue = TestQueue::new();
queue.push(10, 100);
queue.push(20, 200);
queue.push(30, 300);
assert_eq!(queue.len(), 3);
assert_eq!(queue.first_index(), Some(10));
assert_eq!(queue.drain_upto(30), vec![(10, 100), (20, 200), (30, 300)]);
}
#[test]
fn test_extend() {
let mut queue = TestQueue::new();
queue.extend(vec![(10, 100), (20, 200), (30, 300)]);
assert_eq!(queue.drain_upto(30), vec![(10, 100), (20, 200), (30, 300)]);
queue.push(40, 400);
queue.extend(vec![(50, 500), (60, 600)]);
assert_eq!(queue.drain_upto(60), vec![(40, 400), (50, 500), (60, 600)]);
}
#[test]
fn test_drain_upto() {
let mut queue = TestQueue::new();
queue.push(10, 100);
queue.push(20, 200);
queue.push(30, 300);
queue.push(40, 400);
assert_eq!(queue.drain_upto(25), vec![(10, 100), (20, 200)]);
assert_eq!(queue.first_index(), Some(30));
queue.push(60, 600);
assert_eq!(queue.drain_upto(50), vec![(30, 300), (40, 400)]);
assert_eq!(queue.first_index(), Some(60));
assert_eq!(queue.drain_upto(100), vec![(60, 600)]);
assert!(queue.is_empty());
assert_eq!(queue.drain_upto(100), vec![]);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "Responder indices must be monotonically increasing")]
fn test_push_panics_non_monotonic() {
let mut queue = TestQueue::new();
queue.push(20, 200);
queue.push(10, 100);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "Responder indices must be monotonically increasing")]
fn test_push_panics_duplicate() {
let mut queue = TestQueue::new();
queue.push(20, 200);
queue.push(20, 200);
}
#[test]
fn test_large_scale() {
let mut queue = TestQueue::new();
for i in 0..1000 {
queue.push(i * 10, i * 10);
}
assert_eq!(queue.drain_upto(4990).len(), 500);
assert_eq!(queue.drain_upto(9990).len(), 500);
assert!(queue.is_empty());
}
#[test]
fn test_drain_from() {
let mut queue = TestQueue::new();
queue.push(10, 100);
queue.push(20, 200);
queue.push(30, 300);
queue.push(40, 400);
let drained: Vec<_> = queue.drain_from(25).collect();
assert_eq!(drained, vec![(30, 300), (40, 400)]);
assert_eq!(queue.first_index(), Some(10));
let drained: Vec<_> = queue.drain_from(10).collect();
assert_eq!(drained, vec![(10, 100), (20, 200)]);
assert!(queue.is_empty());
let drained: Vec<_> = queue.drain_from(50).collect();
assert_eq!(drained, vec![]);
queue.extend(vec![(10, 100), (20, 200), (30, 300)]);
let mut result = Vec::new();
for (log_index, value) in queue.drain_from(20) {
result.push((log_index, value));
if log_index == 20 {
break;
}
}
assert_eq!(result, vec![(20, 200)]);
assert_eq!(queue.first_index(), Some(10));
}
}