use std::collections::VecDeque;
use tokio::sync::mpsc;
pub const DISPATCH_BACKLOG_CAP: usize = 8;
pub const EVENT_BACKLOG_CAP: usize = 32;
#[derive(Debug)]
pub(crate) enum BackpressureError {
QueueFull,
Closed,
}
pub(crate) struct BackpressureQueue<T> {
backlog: VecDeque<T>,
cap: usize,
}
impl<T> BackpressureQueue<T> {
#[must_use]
pub fn new(cap: usize) -> Self {
Self {
backlog: VecDeque::new(),
cap,
}
}
pub fn enqueue_or_send(
&mut self,
tx: &mpsc::Sender<T>,
item: T,
) -> Result<(), BackpressureError> {
if !self.backlog.is_empty() {
if self.backlog.len() >= self.cap {
return Err(BackpressureError::QueueFull);
}
self.backlog.push_back(item);
return Ok(());
}
match tx.try_send(item) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(item)) => {
if self.backlog.len() >= self.cap {
return Err(BackpressureError::QueueFull);
}
self.backlog.push_back(item);
Ok(())
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(BackpressureError::Closed),
}
}
pub fn try_drain_one(&mut self, tx: &mpsc::Sender<T>) -> Result<bool, BackpressureError> {
let Some(front) = self.backlog.pop_front() else {
return Ok(false);
};
match tx.try_send(front) {
Ok(()) => Ok(true),
Err(mpsc::error::TrySendError::Full(item)) => {
self.backlog.push_front(item);
Ok(false)
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(BackpressureError::Closed),
}
}
#[must_use]
pub fn has_pending(&self) -> bool {
!self.backlog.is_empty()
}
#[must_use]
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.backlog.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn empty_queue_sends_directly() {
let (tx, mut rx) = mpsc::channel::<u32>(4);
let mut q = BackpressureQueue::new(8);
q.enqueue_or_send(&tx, 42).expect("send");
assert!(!q.has_pending());
assert_eq!(rx.recv().await, Some(42));
}
#[tokio::test]
async fn full_channel_spills_to_backlog() {
let (tx, mut rx) = mpsc::channel::<u32>(2);
let mut q = BackpressureQueue::new(8);
q.enqueue_or_send(&tx, 1).expect("send 1");
q.enqueue_or_send(&tx, 2).expect("send 2");
q.enqueue_or_send(&tx, 3).expect("spill 3");
q.enqueue_or_send(&tx, 4).expect("spill 4");
assert_eq!(q.len(), 2);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert!(q.try_drain_one(&tx).expect("drain 3"));
assert!(q.try_drain_one(&tx).expect("drain 4"));
assert!(!q.has_pending());
assert_eq!(rx.recv().await, Some(3));
assert_eq!(rx.recv().await, Some(4));
}
#[tokio::test]
async fn cap_exceeded_returns_queue_full() {
let (tx, _rx) = mpsc::channel::<u32>(1);
let mut q = BackpressureQueue::new(2);
q.enqueue_or_send(&tx, 1).expect("send 1");
q.enqueue_or_send(&tx, 2).expect("spill 2");
q.enqueue_or_send(&tx, 3).expect("spill 3");
let err = q.enqueue_or_send(&tx, 4).expect_err("at cap, must error");
assert!(matches!(err, BackpressureError::QueueFull));
}
#[tokio::test]
async fn closed_channel_returns_closed_error() {
let (tx, rx) = mpsc::channel::<u32>(1);
drop(rx);
let mut q = BackpressureQueue::new(8);
let err = q.enqueue_or_send(&tx, 42).expect_err("closed channel");
assert!(matches!(err, BackpressureError::Closed));
}
#[tokio::test]
async fn order_preserved_with_backlog() {
let (tx, mut rx) = mpsc::channel::<u32>(4);
let mut q = BackpressureQueue::new(8);
q.enqueue_or_send(&tx, 1).expect("send 1");
q.enqueue_or_send(&tx, 2).expect("send 2");
q.enqueue_or_send(&tx, 3).expect("send 3");
q.enqueue_or_send(&tx, 4).expect("send 4");
q.enqueue_or_send(&tx, 5).expect("spill 5");
for expected in 1..=4 {
assert_eq!(rx.recv().await, Some(expected));
}
q.enqueue_or_send(&tx, 6).expect("spill 6");
assert_eq!(q.len(), 2);
assert!(q.try_drain_one(&tx).expect("drain 5"));
assert!(q.try_drain_one(&tx).expect("drain 6"));
assert_eq!(rx.recv().await, Some(5));
assert_eq!(rx.recv().await, Some(6));
}
#[tokio::test]
async fn drain_when_channel_still_full_returns_false() {
let (tx, _rx) = mpsc::channel::<u32>(1);
let mut q = BackpressureQueue::new(8);
q.enqueue_or_send(&tx, 1).expect("send 1");
q.enqueue_or_send(&tx, 2).expect("spill 2");
assert!(!q.try_drain_one(&tx).expect("drain attempt"));
assert_eq!(q.len(), 1);
}
#[tokio::test]
async fn cap_zero_immediately_overflows_on_full_channel() {
let (tx, _rx) = mpsc::channel::<u32>(1);
let mut q = BackpressureQueue::new(0);
q.enqueue_or_send(&tx, 1)
.expect("first send fits in channel");
let err = q.enqueue_or_send(&tx, 2).expect_err("cap=0 must overflow");
assert!(matches!(err, BackpressureError::QueueFull));
}
#[tokio::test]
async fn drain_on_empty_backlog_is_noop() {
let (tx, _rx) = mpsc::channel::<u32>(4);
let mut q = BackpressureQueue::<u32>::new(8);
assert!(!q.try_drain_one(&tx).expect("drain on empty backlog"));
assert!(!q.has_pending());
}
#[tokio::test]
async fn drain_propagates_closed_channel() {
let (tx, rx) = mpsc::channel::<u32>(1);
let mut q = BackpressureQueue::new(8);
q.enqueue_or_send(&tx, 1).expect("send 1");
q.enqueue_or_send(&tx, 2).expect("spill 2");
drop(rx);
let err = q.try_drain_one(&tx).expect_err("closed channel");
assert!(matches!(err, BackpressureError::Closed));
}
#[tokio::test]
async fn has_pending_tracks_backlog_state() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
let mut q = BackpressureQueue::new(8);
assert!(!q.has_pending());
q.enqueue_or_send(&tx, 1).expect("send 1");
assert!(!q.has_pending(), "in-channel send doesn't add to backlog");
q.enqueue_or_send(&tx, 2).expect("spill 2");
assert!(q.has_pending(), "spilled item is pending");
let _ = rx.recv().await;
assert!(q.try_drain_one(&tx).expect("drain"));
assert!(!q.has_pending(), "backlog drained");
}
}