use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use tokio::sync::watch;
use tokio::sync::{Mutex, Notify};
use crate::Chunk;
use crate::kernel::{Effect, box_future, succeed};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueueError {
Disconnected,
}
impl fmt::Display for QueueError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
QueueError::Disconnected => write!(f, "queue is disconnected"),
}
}
}
impl std::error::Error for QueueError {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum FlumeKind {
Bounded,
Unbounded,
Dropping,
}
struct FlumeShared<A: Send> {
tx: Mutex<Option<flume::Sender<A>>>,
rx: Mutex<Option<flume::Receiver<A>>>,
kind: FlumeKind,
shutdown: watch::Sender<bool>,
}
impl<A: Send + 'static> FlumeShared<A> {
fn new(
kind: FlumeKind,
tx: flume::Sender<A>,
rx: flume::Receiver<A>,
shutdown: watch::Sender<bool>,
) -> Self {
Self {
tx: Mutex::new(Some(tx)),
rx: Mutex::new(Some(rx)),
kind,
shutdown,
}
}
async fn offer(&self, value: A) -> bool {
let mut guard = self.tx.lock().await;
let Some(tx) = guard.as_mut() else {
return false;
};
match self.kind {
FlumeKind::Bounded => match tx.try_send(value) {
Ok(()) => true,
Err(flume::TrySendError::Full(_)) => false,
Err(flume::TrySendError::Disconnected(_)) => false,
},
FlumeKind::Dropping => match tx.try_send(value) {
Ok(()) => true,
Err(flume::TrySendError::Full(v)) => {
drop(v);
false
}
Err(flume::TrySendError::Disconnected(_)) => false,
},
FlumeKind::Unbounded => match tx.try_send(value) {
Ok(()) => true,
Err(flume::TrySendError::Full(v)) => {
drop(v);
false
}
Err(flume::TrySendError::Disconnected(_)) => false,
},
}
}
async fn recv(&self) -> Result<A, QueueError> {
let rx = {
let guard = self.rx.lock().await;
guard.as_ref().map(flume::Receiver::clone)
};
let Some(rx) = rx else {
return Err(QueueError::Disconnected);
};
match rx.recv_async().await {
Ok(v) => Ok(v),
Err(_) => Err(QueueError::Disconnected),
}
}
fn try_recv(&self) -> Result<Option<A>, QueueError> {
let guard = self.rx.blocking_lock();
let Some(rx) = guard.as_ref() else {
return Err(QueueError::Disconnected);
};
match rx.try_recv() {
Ok(v) => Ok(Some(v)),
Err(flume::TryRecvError::Empty) => Ok(None),
Err(flume::TryRecvError::Disconnected) => Err(QueueError::Disconnected),
}
}
async fn offer_or_retain(&self, value: A) -> Result<(), A> {
let mut guard = self.tx.lock().await;
let Some(tx) = guard.as_mut() else {
return Ok(());
};
match self.kind {
FlumeKind::Bounded => match tx.try_send(value) {
Ok(()) => Ok(()),
Err(flume::TrySendError::Full(v)) => Err(v),
Err(flume::TrySendError::Disconnected(v)) => {
drop(v);
Ok(())
}
},
FlumeKind::Unbounded | FlumeKind::Dropping => match tx.try_send(value) {
Ok(()) => Ok(()),
Err(flume::TrySendError::Full(v)) => {
drop(v);
Ok(())
}
Err(flume::TrySendError::Disconnected(v)) => {
drop(v);
Ok(())
}
},
}
}
fn len(&self) -> usize {
let guard = self.rx.blocking_lock();
guard.as_ref().map(flume::Receiver::len).unwrap_or(0)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn is_full(&self) -> bool {
let tx_guard = self.tx.blocking_lock();
let Some(tx) = tx_guard.as_ref() else {
return true;
};
tx.is_full()
}
async fn shutdown(&self) {
let mut guard = self.tx.lock().await;
guard.take();
self.shutdown.send_replace(true);
}
fn is_shutdown(&self) -> bool {
*self.shutdown.borrow()
}
async fn await_shutdown(&self) {
if *self.shutdown.borrow() {
return;
}
let mut rx = self.shutdown.subscribe();
let _ = rx.changed().await;
}
}
struct SlidingState<A> {
deque: VecDeque<A>,
open: bool,
}
struct SlidingShared<A: Send> {
state: Mutex<SlidingState<A>>,
capacity: usize,
not_empty: Notify,
shutdown: watch::Sender<bool>,
}
impl<A: Send + 'static> SlidingShared<A> {
fn new(capacity: usize) -> Self {
Self {
state: Mutex::new(SlidingState {
deque: VecDeque::new(),
open: true,
}),
capacity: capacity.max(1),
not_empty: Notify::new(),
shutdown: watch::channel(false).0,
}
}
async fn offer(&self, value: A) -> bool {
self.offer_or_retain(value).await.is_ok()
}
async fn offer_or_retain(&self, value: A) -> Result<(), A> {
let mut g = self.state.lock().await;
if !g.open {
return Err(value);
}
g.deque.push_back(value);
while g.deque.len() > self.capacity {
g.deque.pop_front();
}
drop(g);
self.not_empty.notify_waiters();
Ok(())
}
async fn recv(&self) -> Result<A, QueueError> {
loop {
let maybe = {
let mut g = self.state.lock().await;
if let Some(v) = g.deque.pop_front() {
Some(v)
} else if !g.open {
return Err(QueueError::Disconnected);
} else {
None
}
};
if let Some(v) = maybe {
return Ok(v);
}
self.not_empty.notified().await;
}
}
fn try_recv(&self) -> Result<Option<A>, QueueError> {
let mut g = self.state.blocking_lock();
if let Some(v) = g.deque.pop_front() {
return Ok(Some(v));
}
if !g.open {
return Err(QueueError::Disconnected);
}
Ok(None)
}
async fn len(&self) -> usize {
self.state.lock().await.deque.len()
}
async fn is_empty(&self) -> bool {
self.state.lock().await.deque.is_empty()
}
async fn is_full(&self) -> bool {
self.state.lock().await.deque.len() >= self.capacity
}
async fn shutdown(&self) {
let mut g = self.state.lock().await;
g.open = false;
drop(g);
self.shutdown.send_replace(true);
self.not_empty.notify_waiters();
}
fn is_shutdown(&self) -> bool {
*self.shutdown.borrow()
}
async fn await_shutdown(&self) {
if *self.shutdown.borrow() {
return;
}
let mut rx = self.shutdown.subscribe();
let _ = rx.changed().await;
}
}
enum QueueRepr<A: Send + 'static> {
Flume(Arc<FlumeShared<A>>),
Sliding(Arc<SlidingShared<A>>),
}
pub struct Queue<A: Send + 'static> {
repr: Arc<QueueRepr<A>>,
}
impl<A: Send + 'static> Clone for Queue<A> {
fn clone(&self) -> Self {
Self {
repr: Arc::clone(&self.repr),
}
}
}
impl<A: Send + 'static> Queue<A> {
fn from_flume(kind: FlumeKind, tx: flume::Sender<A>, rx: flume::Receiver<A>) -> Self {
let shutdown = watch::channel(false).0;
Self {
repr: Arc::new(QueueRepr::Flume(Arc::new(FlumeShared::new(
kind, tx, rx, shutdown,
)))),
}
}
fn from_sliding(inner: Arc<SlidingShared<A>>) -> Self {
Self {
repr: Arc::new(QueueRepr::Sliding(inner)),
}
}
pub fn bounded(capacity: usize) -> Effect<Queue<A>, (), ()> {
let cap = capacity.max(1);
let (tx, rx) = flume::bounded(cap);
succeed(Self::from_flume(FlumeKind::Bounded, tx, rx))
}
pub fn unbounded() -> Effect<Queue<A>, (), ()> {
let (tx, rx) = flume::unbounded();
succeed(Self::from_flume(FlumeKind::Unbounded, tx, rx))
}
pub fn dropping(capacity: usize) -> Effect<Queue<A>, (), ()> {
let cap = capacity.max(1);
let (tx, rx) = flume::bounded(cap);
succeed(Self::from_flume(FlumeKind::Dropping, tx, rx))
}
pub fn sliding(capacity: usize) -> Effect<Queue<A>, (), ()> {
let inner = Arc::new(SlidingShared::new(capacity));
succeed(Self::from_sliding(inner))
}
pub fn offer(&self, value: A) -> Effect<bool, (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => Ok(f.offer(value).await),
QueueRepr::Sliding(s) => Ok(s.offer(value).await),
}
})
})
}
pub fn offer_all<I>(&self, iter: I) -> Effect<Vec<A>, (), ()>
where
I: IntoIterator<Item = A> + 'static,
I::IntoIter: Send + 'static,
{
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
let mut left = Vec::new();
for v in iter {
match &*repr {
QueueRepr::Flume(f) => match f.offer_or_retain(v).await {
Ok(()) => {}
Err(v) => left.push(v),
},
QueueRepr::Sliding(s) => match s.offer_or_retain(v).await {
Ok(()) => {}
Err(v) => left.push(v),
},
}
}
Ok(left)
})
})
}
pub fn take(&self) -> Effect<A, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => f.recv().await,
QueueRepr::Sliding(s) => s.recv().await,
}
})
})
}
pub fn take_all(&self) -> Effect<Chunk<A>, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
let first = match &*repr {
QueueRepr::Flume(f) => f.recv().await?,
QueueRepr::Sliding(s) => s.recv().await?,
};
let mut out = vec![first];
loop {
match match &*repr {
QueueRepr::Flume(f) => f.try_recv(),
QueueRepr::Sliding(s) => s.try_recv(),
} {
Ok(None) => break,
Ok(Some(v)) => out.push(v),
Err(e) => return Err(e),
}
}
Ok(Chunk::from_vec(out))
})
})
}
pub fn take_up_to(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
if n == 0 {
return Ok(Chunk::empty());
}
let first = match &*repr {
QueueRepr::Flume(f) => f.recv().await?,
QueueRepr::Sliding(s) => s.recv().await?,
};
let mut out = vec![first];
while out.len() < n {
match match &*repr {
QueueRepr::Flume(f) => f.try_recv(),
QueueRepr::Sliding(s) => s.try_recv(),
} {
Ok(None) => break,
Ok(Some(v)) => out.push(v),
Err(e) => return Err(e),
}
}
Ok(Chunk::from_vec(out))
})
})
}
pub fn take_n(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
if n == 0 {
return Ok(Chunk::empty());
}
let mut out = Vec::with_capacity(n);
for _ in 0..n {
let v = match &*repr {
QueueRepr::Flume(f) => f.recv().await?,
QueueRepr::Sliding(s) => s.recv().await?,
};
out.push(v);
}
Ok(Chunk::from_vec(out))
})
})
}
pub fn take_between(&self, min: usize, max: usize) -> Effect<Chunk<A>, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
if min > max {
return Ok(Chunk::empty());
}
if min == 0 && max == 0 {
return Ok(Chunk::empty());
}
let mut out = Vec::new();
for _ in 0..min {
let v = match &*repr {
QueueRepr::Flume(f) => f.recv().await?,
QueueRepr::Sliding(s) => s.recv().await?,
};
out.push(v);
}
while out.len() < max {
match match &*repr {
QueueRepr::Flume(f) => f.try_recv(),
QueueRepr::Sliding(s) => s.try_recv(),
} {
Ok(None) => break,
Ok(Some(v)) => out.push(v),
Err(e) => return Err(e),
}
}
Ok(Chunk::from_vec(out))
})
})
}
pub fn poll(&self) -> Effect<Option<A>, QueueError, ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
tokio::task::yield_now().await;
match &*repr {
QueueRepr::Flume(f) => f.try_recv(),
QueueRepr::Sliding(s) => s.try_recv(),
}
})
})
}
pub fn size(&self) -> Effect<usize, (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => Ok(f.len()),
QueueRepr::Sliding(s) => Ok(s.len().await),
}
})
})
}
pub fn is_empty(&self) -> Effect<bool, (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => Ok(f.is_empty()),
QueueRepr::Sliding(s) => Ok(s.is_empty().await),
}
})
})
}
pub fn is_full(&self) -> Effect<bool, (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => Ok(f.is_full()),
QueueRepr::Sliding(s) => Ok(s.is_full().await),
}
})
})
}
pub fn shutdown(&self) -> Effect<(), (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => f.shutdown().await,
QueueRepr::Sliding(s) => s.shutdown().await,
}
Ok(())
})
})
}
pub fn is_shutdown(&self) -> Effect<bool, (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => Ok(f.is_shutdown()),
QueueRepr::Sliding(s) => Ok(s.is_shutdown()),
}
})
})
}
pub fn await_shutdown(&self) -> Effect<(), (), ()> {
let repr = Arc::clone(&self.repr);
Effect::new_async(move |_r| {
box_future(async move {
match &*repr {
QueueRepr::Flume(f) => f.await_shutdown().await,
QueueRepr::Sliding(s) => s.await_shutdown().await,
}
Ok(())
})
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::run_async;
fn drive<A: 'static, E: 'static, R: 'static>(eff: Effect<A, E, R>, env: R) -> Result<A, E> {
pollster::block_on(run_async(eff, env))
}
#[test]
fn queue_take_suspends_until_offer() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
let q2 = q.clone();
let h = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(20));
drive(q2.offer(7u32), ()).unwrap();
});
let v = drive(q.take(), ()).unwrap();
h.join().unwrap();
assert_eq!(v, 7);
}
#[test]
fn queue_bounded_offer_returns_false_when_full() {
let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
assert!(drive(q.offer(1u32), ()).unwrap());
assert!(!drive(q.offer(2u32), ()).unwrap());
assert_eq!(drive(q.take(), ()).unwrap(), 1);
assert!(drive(q.offer(3u32), ()).unwrap());
}
#[test]
fn queue_dropping_drops_newest() {
let q = drive(Queue::<u32>::dropping(1), ()).unwrap();
assert!(drive(q.offer(1u32), ()).unwrap());
assert!(!drive(q.offer(2u32), ()).unwrap());
assert_eq!(drive(q.size(), ()).unwrap(), 1);
assert_eq!(drive(q.take(), ()).unwrap(), 1);
}
#[test]
fn queue_sliding_drops_oldest() {
let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
assert!(drive(q.offer(1u32), ()).unwrap());
assert!(drive(q.offer(2u32), ()).unwrap());
assert!(drive(q.offer(3u32), ()).unwrap());
assert_eq!(drive(q.take(), ()).unwrap(), 2);
assert_eq!(drive(q.take(), ()).unwrap(), 3);
}
#[test]
fn queue_await_shutdown_returns_after_shutdown() {
let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
let q2 = q.clone();
let h = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(15));
drive(q2.shutdown(), ()).unwrap();
});
drive(q.await_shutdown(), ()).unwrap();
h.join().unwrap();
assert!(drive(q.is_shutdown(), ()).unwrap());
}
#[test]
fn queue_offer_all_retains_overflow_bounded() {
let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
let left = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
assert_eq!(left, vec![3, 4]);
let chunk = drive(q.take_all(), ()).unwrap();
assert_eq!(chunk.into_vec(), vec![1, 2]);
}
#[test]
fn queue_take_up_to_and_take_n() {
let q = drive(Queue::<u32>::bounded(10), ()).unwrap();
drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
let c = drive(q.take_up_to(2), ()).unwrap();
assert_eq!(c.into_vec(), vec![1, 2]);
drive(q.offer_all([4u32, 5]), ()).unwrap();
let c2 = drive(q.take_n(2), ()).unwrap();
assert_eq!(c2.into_vec(), vec![3, 4]);
}
#[test]
fn queue_take_between_min_max_and_edges() {
let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
assert_eq!(drive(q.take_between(2, 1), ()).unwrap().len(), 0);
assert_eq!(drive(q.take_between(0, 0), ()).unwrap().len(), 0);
drive(q.offer_all([10u32, 11, 12]), ()).unwrap();
let c = drive(q.take_between(2, 3), ()).unwrap();
assert_eq!(c.len(), 3);
}
#[test]
fn queue_poll_and_is_empty_is_full() {
let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
assert_eq!(drive(q.poll(), ()).unwrap(), None);
assert!(drive(q.is_empty(), ()).unwrap());
drive(q.offer(7u32), ()).unwrap();
assert!(drive(q.is_full(), ()).unwrap());
assert_eq!(drive(q.poll(), ()).unwrap(), Some(7));
}
#[test]
fn queue_sliding_is_full_after_capacity() {
let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
assert!(drive(q.is_full(), ()).unwrap());
}
#[test]
fn queue_error_display_and_debug() {
let e = QueueError::Disconnected;
assert_eq!(format!("{e}"), "queue is disconnected");
assert!(format!("{e:?}").contains("Disconnected"));
use std::error::Error;
assert!(e.source().is_none());
}
#[test]
fn queue_take_n_zero_returns_empty_chunk() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.offer(1u32), ()).unwrap();
let c = drive(q.take_n(0), ()).unwrap();
assert_eq!(c.len(), 0);
assert_eq!(drive(q.size(), ()).unwrap(), 1);
}
#[test]
fn queue_take_up_to_zero_returns_empty_chunk() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.offer(1u32), ()).unwrap();
let c = drive(q.take_up_to(0), ()).unwrap();
assert_eq!(c.len(), 0);
assert_eq!(drive(q.size(), ()).unwrap(), 1);
}
#[test]
fn queue_take_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
}
#[test]
fn queue_take_all_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.take_all(), ()), Err(QueueError::Disconnected));
}
#[test]
fn queue_take_up_to_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.take_up_to(3), ()), Err(QueueError::Disconnected));
}
#[test]
fn queue_take_n_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.take_n(2), ()), Err(QueueError::Disconnected));
}
#[test]
fn queue_poll_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.poll(), ()), Err(QueueError::Disconnected));
}
#[test]
fn queue_take_between_returns_err_when_shut_down_empty() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(
drive(q.take_between(1, 3), ()),
Err(QueueError::Disconnected)
);
}
#[test]
fn queue_offer_after_shutdown_returns_false() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert!(!drive(q.offer(99u32), ()).unwrap());
}
#[test]
fn queue_offer_all_after_shutdown_silently_drops_items() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
let retained = drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
assert!(
retained.is_empty(),
"items offered after shutdown are silently dropped, not retained"
);
}
#[test]
fn queue_unbounded_is_never_full() {
let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
for i in 0u32..100 {
drive(q.offer(i), ()).unwrap();
}
assert!(!drive(q.is_full(), ()).unwrap());
assert_eq!(drive(q.size(), ()).unwrap(), 100);
assert!(!drive(q.is_empty(), ()).unwrap());
}
#[test]
fn queue_dropping_size_and_fullness() {
let q = drive(Queue::<u32>::dropping(3), ()).unwrap();
assert!(drive(q.is_empty(), ()).unwrap());
drive(q.offer_all([10u32, 20, 30, 40]), ()).unwrap();
assert_eq!(drive(q.size(), ()).unwrap(), 3);
assert!(drive(q.is_full(), ()).unwrap());
}
#[test]
fn queue_sliding_size_and_fullness() {
let q = drive(Queue::<u32>::sliding(3), ()).unwrap();
assert!(drive(q.is_empty(), ()).unwrap());
drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
assert_eq!(drive(q.size(), ()).unwrap(), 3);
assert!(drive(q.is_full(), ()).unwrap());
assert!(!drive(q.is_empty(), ()).unwrap());
}
#[test]
fn queue_offer_all_on_sliding_always_accepts() {
let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
let retained = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
assert!(retained.is_empty(), "sliding should not retain items");
let c = drive(q.take_all(), ()).unwrap();
assert_eq!(c.into_vec(), vec![3, 4]);
}
#[test]
fn queue_is_shutdown_before_and_after() {
let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
assert!(!drive(q.is_shutdown(), ()).unwrap());
drive(q.shutdown(), ()).unwrap();
assert!(drive(q.is_shutdown(), ()).unwrap());
}
#[test]
fn queue_take_drains_buffer_then_errors_after_shutdown() {
let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
drive(q.offer(42u32), ()).unwrap();
drive(q.shutdown(), ()).unwrap();
assert_eq!(drive(q.take(), ()).unwrap(), 42);
assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
}
}