Skip to main content

id_effect/coordination/
queue.rs

1//! MPMC queue primitives — bounded, unbounded, dropping, and sliding backpressure.
2//!
3//! Flume backs bounded/unbounded/dropping; sliding uses an internal `VecDeque` with
4//! fixed capacity (oldest dropped on overflow).
5
6use std::collections::VecDeque;
7use std::fmt;
8use std::sync::Arc;
9
10use tokio::sync::watch;
11use tokio::sync::{Mutex, Notify};
12
13use crate::Chunk;
14use crate::kernel::{Effect, box_future, succeed};
15
16/// Error returned when receiving from a [`Queue`] that has no senders and no buffered values.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum QueueError {
19  /// No senders remain and the buffer is empty.
20  Disconnected,
21}
22
23impl fmt::Display for QueueError {
24  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25    match self {
26      QueueError::Disconnected => write!(f, "queue is disconnected"),
27    }
28  }
29}
30
31impl std::error::Error for QueueError {}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34enum FlumeKind {
35  Bounded,
36  Unbounded,
37  Dropping,
38}
39
40struct FlumeShared<A: Send> {
41  tx: Mutex<Option<flume::Sender<A>>>,
42  rx: Mutex<Option<flume::Receiver<A>>>,
43  kind: FlumeKind,
44  shutdown: watch::Sender<bool>,
45}
46
47impl<A: Send + 'static> FlumeShared<A> {
48  fn new(
49    kind: FlumeKind,
50    tx: flume::Sender<A>,
51    rx: flume::Receiver<A>,
52    shutdown: watch::Sender<bool>,
53  ) -> Self {
54    Self {
55      tx: Mutex::new(Some(tx)),
56      rx: Mutex::new(Some(rx)),
57      kind,
58      shutdown,
59    }
60  }
61
62  async fn offer(&self, value: A) -> bool {
63    let mut guard = self.tx.lock().await;
64    let Some(tx) = guard.as_mut() else {
65      return false;
66    };
67    match self.kind {
68      FlumeKind::Bounded => match tx.try_send(value) {
69        Ok(()) => true,
70        Err(flume::TrySendError::Full(_)) => false,
71        Err(flume::TrySendError::Disconnected(_)) => false,
72      },
73      FlumeKind::Dropping => match tx.try_send(value) {
74        Ok(()) => true,
75        Err(flume::TrySendError::Full(v)) => {
76          drop(v);
77          false
78        }
79        Err(flume::TrySendError::Disconnected(_)) => false,
80      },
81      FlumeKind::Unbounded => match tx.try_send(value) {
82        Ok(()) => true,
83        Err(flume::TrySendError::Full(v)) => {
84          drop(v);
85          false
86        }
87        Err(flume::TrySendError::Disconnected(_)) => false,
88      },
89    }
90  }
91
92  async fn recv(&self) -> Result<A, QueueError> {
93    let rx = {
94      let guard = self.rx.lock().await;
95      guard.as_ref().map(flume::Receiver::clone)
96    };
97    let Some(rx) = rx else {
98      return Err(QueueError::Disconnected);
99    };
100    match rx.recv_async().await {
101      Ok(v) => Ok(v),
102      Err(_) => Err(QueueError::Disconnected),
103    }
104  }
105
106  fn try_recv(&self) -> Result<Option<A>, QueueError> {
107    let guard = self.rx.blocking_lock();
108    let Some(rx) = guard.as_ref() else {
109      return Err(QueueError::Disconnected);
110    };
111    match rx.try_recv() {
112      Ok(v) => Ok(Some(v)),
113      Err(flume::TryRecvError::Empty) => Ok(None),
114      Err(flume::TryRecvError::Disconnected) => Err(QueueError::Disconnected),
115    }
116  }
117
118  /// Like [`FlumeShared::offer`], but returns `Err(value)` when a **bounded** queue is full so
119  /// callers (e.g. [`Queue::offer_all`]) can retain unstored elements.
120  async fn offer_or_retain(&self, value: A) -> Result<(), A> {
121    let mut guard = self.tx.lock().await;
122    let Some(tx) = guard.as_mut() else {
123      return Ok(());
124    };
125    match self.kind {
126      FlumeKind::Bounded => match tx.try_send(value) {
127        Ok(()) => Ok(()),
128        Err(flume::TrySendError::Full(v)) => Err(v),
129        Err(flume::TrySendError::Disconnected(v)) => {
130          drop(v);
131          Ok(())
132        }
133      },
134      FlumeKind::Unbounded | FlumeKind::Dropping => match tx.try_send(value) {
135        Ok(()) => Ok(()),
136        Err(flume::TrySendError::Full(v)) => {
137          drop(v);
138          Ok(())
139        }
140        Err(flume::TrySendError::Disconnected(v)) => {
141          drop(v);
142          Ok(())
143        }
144      },
145    }
146  }
147
148  fn len(&self) -> usize {
149    let guard = self.rx.blocking_lock();
150    guard.as_ref().map(flume::Receiver::len).unwrap_or(0)
151  }
152
153  fn is_empty(&self) -> bool {
154    self.len() == 0
155  }
156
157  fn is_full(&self) -> bool {
158    let tx_guard = self.tx.blocking_lock();
159    let Some(tx) = tx_guard.as_ref() else {
160      return true;
161    };
162    tx.is_full()
163  }
164
165  async fn shutdown(&self) {
166    let mut guard = self.tx.lock().await;
167    guard.take();
168    // Use send_replace so the value is stored even when no watch receivers exist.
169    self.shutdown.send_replace(true);
170  }
171
172  fn is_shutdown(&self) -> bool {
173    *self.shutdown.borrow()
174  }
175
176  async fn await_shutdown(&self) {
177    if *self.shutdown.borrow() {
178      return;
179    }
180    let mut rx = self.shutdown.subscribe();
181    let _ = rx.changed().await;
182  }
183}
184
185struct SlidingState<A> {
186  deque: VecDeque<A>,
187  open: bool,
188}
189
190struct SlidingShared<A: Send> {
191  state: Mutex<SlidingState<A>>,
192  capacity: usize,
193  not_empty: Notify,
194  shutdown: watch::Sender<bool>,
195}
196
197impl<A: Send + 'static> SlidingShared<A> {
198  fn new(capacity: usize) -> Self {
199    Self {
200      state: Mutex::new(SlidingState {
201        deque: VecDeque::new(),
202        open: true,
203      }),
204      capacity: capacity.max(1),
205      not_empty: Notify::new(),
206      shutdown: watch::channel(false).0,
207    }
208  }
209
210  async fn offer(&self, value: A) -> bool {
211    self.offer_or_retain(value).await.is_ok()
212  }
213
214  /// Same as [`SlidingShared::offer`], but returns `Err(value)` when the queue is closed so
215  /// [`Queue::offer_all`] can retain unstored elements.
216  async fn offer_or_retain(&self, value: A) -> Result<(), A> {
217    let mut g = self.state.lock().await;
218    if !g.open {
219      return Err(value);
220    }
221    g.deque.push_back(value);
222    while g.deque.len() > self.capacity {
223      g.deque.pop_front();
224    }
225    drop(g);
226    self.not_empty.notify_waiters();
227    Ok(())
228  }
229
230  async fn recv(&self) -> Result<A, QueueError> {
231    loop {
232      let maybe = {
233        let mut g = self.state.lock().await;
234        if let Some(v) = g.deque.pop_front() {
235          Some(v)
236        } else if !g.open {
237          return Err(QueueError::Disconnected);
238        } else {
239          None
240        }
241      };
242      if let Some(v) = maybe {
243        return Ok(v);
244      }
245      self.not_empty.notified().await;
246    }
247  }
248
249  fn try_recv(&self) -> Result<Option<A>, QueueError> {
250    let mut g = self.state.blocking_lock();
251    if let Some(v) = g.deque.pop_front() {
252      return Ok(Some(v));
253    }
254    if !g.open {
255      return Err(QueueError::Disconnected);
256    }
257    Ok(None)
258  }
259
260  async fn len(&self) -> usize {
261    self.state.lock().await.deque.len()
262  }
263
264  async fn is_empty(&self) -> bool {
265    self.state.lock().await.deque.is_empty()
266  }
267
268  async fn is_full(&self) -> bool {
269    self.state.lock().await.deque.len() >= self.capacity
270  }
271
272  async fn shutdown(&self) {
273    let mut g = self.state.lock().await;
274    g.open = false;
275    drop(g);
276    // Use send_replace so the value is stored even when no watch receivers exist.
277    self.shutdown.send_replace(true);
278    self.not_empty.notify_waiters();
279  }
280
281  fn is_shutdown(&self) -> bool {
282    *self.shutdown.borrow()
283  }
284
285  async fn await_shutdown(&self) {
286    if *self.shutdown.borrow() {
287      return;
288    }
289    let mut rx = self.shutdown.subscribe();
290    let _ = rx.changed().await;
291  }
292}
293
294enum QueueRepr<A: Send + 'static> {
295  Flume(Arc<FlumeShared<A>>),
296  Sliding(Arc<SlidingShared<A>>),
297}
298
299/// Cloneable multi-producer / multi-consumer queue with several backpressure modes.
300pub struct Queue<A: Send + 'static> {
301  repr: Arc<QueueRepr<A>>,
302}
303
304impl<A: Send + 'static> Clone for Queue<A> {
305  fn clone(&self) -> Self {
306    Self {
307      repr: Arc::clone(&self.repr),
308    }
309  }
310}
311
312impl<A: Send + 'static> Queue<A> {
313  fn from_flume(kind: FlumeKind, tx: flume::Sender<A>, rx: flume::Receiver<A>) -> Self {
314    let shutdown = watch::channel(false).0;
315    Self {
316      repr: Arc::new(QueueRepr::Flume(Arc::new(FlumeShared::new(
317        kind, tx, rx, shutdown,
318      )))),
319    }
320  }
321
322  fn from_sliding(inner: Arc<SlidingShared<A>>) -> Self {
323    Self {
324      repr: Arc::new(QueueRepr::Sliding(inner)),
325    }
326  }
327
328  /// Bounded queue: [`Queue::offer`] returns `false` when full (no blocking).
329  pub fn bounded(capacity: usize) -> Effect<Queue<A>, (), ()> {
330    let cap = capacity.max(1);
331    let (tx, rx) = flume::bounded(cap);
332    succeed(Self::from_flume(FlumeKind::Bounded, tx, rx))
333  }
334
335  /// Unbounded queue.
336  pub fn unbounded() -> Effect<Queue<A>, (), ()> {
337    let (tx, rx) = flume::unbounded();
338    succeed(Self::from_flume(FlumeKind::Unbounded, tx, rx))
339  }
340
341  /// Bounded queue that drops the **incoming** element when full.
342  pub fn dropping(capacity: usize) -> Effect<Queue<A>, (), ()> {
343    let cap = capacity.max(1);
344    let (tx, rx) = flume::bounded(cap);
345    succeed(Self::from_flume(FlumeKind::Dropping, tx, rx))
346  }
347
348  /// Fixed-capacity queue that drops the **oldest** element when full.
349  pub fn sliding(capacity: usize) -> Effect<Queue<A>, (), ()> {
350    let inner = Arc::new(SlidingShared::new(capacity));
351    succeed(Self::from_sliding(inner))
352  }
353
354  /// Try to enqueue one value. `false` means the value was not stored (full, dropping, or shut down).
355  pub fn offer(&self, value: A) -> Effect<bool, (), ()> {
356    let repr = Arc::clone(&self.repr);
357    Effect::new_async(move |_r| {
358      box_future(async move {
359        match &*repr {
360          QueueRepr::Flume(f) => Ok(f.offer(value).await),
361          QueueRepr::Sliding(s) => Ok(s.offer(value).await),
362        }
363      })
364    })
365  }
366
367  /// Enqueue as many values as possible; returns those that could not be stored (in order).
368  pub fn offer_all<I>(&self, iter: I) -> Effect<Vec<A>, (), ()>
369  where
370    I: IntoIterator<Item = A> + 'static,
371    I::IntoIter: Send + 'static,
372  {
373    let repr = Arc::clone(&self.repr);
374    Effect::new_async(move |_r| {
375      box_future(async move {
376        let mut left = Vec::new();
377        for v in iter {
378          match &*repr {
379            QueueRepr::Flume(f) => match f.offer_or_retain(v).await {
380              Ok(()) => {}
381              Err(v) => left.push(v),
382            },
383            QueueRepr::Sliding(s) => match s.offer_or_retain(v).await {
384              Ok(()) => {}
385              Err(v) => left.push(v),
386            },
387          }
388        }
389        Ok(left)
390      })
391    })
392  }
393
394  /// Block until a value is available or the queue disconnects.
395  pub fn take(&self) -> Effect<A, QueueError, ()> {
396    let repr = Arc::clone(&self.repr);
397    Effect::new_async(move |_r| {
398      box_future(async move {
399        match &*repr {
400          QueueRepr::Flume(f) => f.recv().await,
401          QueueRepr::Sliding(s) => s.recv().await,
402        }
403      })
404    })
405  }
406
407  /// Wait for at least one element, then drain all currently available elements.
408  pub fn take_all(&self) -> Effect<Chunk<A>, QueueError, ()> {
409    let repr = Arc::clone(&self.repr);
410    Effect::new_async(move |_r| {
411      box_future(async move {
412        let first = match &*repr {
413          QueueRepr::Flume(f) => f.recv().await?,
414          QueueRepr::Sliding(s) => s.recv().await?,
415        };
416        let mut out = vec![first];
417        loop {
418          match match &*repr {
419            QueueRepr::Flume(f) => f.try_recv(),
420            QueueRepr::Sliding(s) => s.try_recv(),
421          } {
422            Ok(None) => break,
423            Ok(Some(v)) => out.push(v),
424            Err(e) => return Err(e),
425          }
426        }
427        Ok(Chunk::from_vec(out))
428      })
429    })
430  }
431
432  /// After the first element arrives, take at most `n` elements total (including the first).
433  pub fn take_up_to(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
434    let repr = Arc::clone(&self.repr);
435    Effect::new_async(move |_r| {
436      box_future(async move {
437        if n == 0 {
438          return Ok(Chunk::empty());
439        }
440        let first = match &*repr {
441          QueueRepr::Flume(f) => f.recv().await?,
442          QueueRepr::Sliding(s) => s.recv().await?,
443        };
444        let mut out = vec![first];
445        while out.len() < n {
446          match match &*repr {
447            QueueRepr::Flume(f) => f.try_recv(),
448            QueueRepr::Sliding(s) => s.try_recv(),
449          } {
450            Ok(None) => break,
451            Ok(Some(v)) => out.push(v),
452            Err(e) => return Err(e),
453          }
454        }
455        Ok(Chunk::from_vec(out))
456      })
457    })
458  }
459
460  /// Block until `n` separate receives complete (or disconnect).
461  pub fn take_n(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
462    let repr = Arc::clone(&self.repr);
463    Effect::new_async(move |_r| {
464      box_future(async move {
465        if n == 0 {
466          return Ok(Chunk::empty());
467        }
468        let mut out = Vec::with_capacity(n);
469        for _ in 0..n {
470          let v = match &*repr {
471            QueueRepr::Flume(f) => f.recv().await?,
472            QueueRepr::Sliding(s) => s.recv().await?,
473          };
474          out.push(v);
475        }
476        Ok(Chunk::from_vec(out))
477      })
478    })
479  }
480
481  /// Take between `min` and `max` elements inclusive (waits for `min` values unless disconnected).
482  pub fn take_between(&self, min: usize, max: usize) -> Effect<Chunk<A>, QueueError, ()> {
483    let repr = Arc::clone(&self.repr);
484    Effect::new_async(move |_r| {
485      box_future(async move {
486        if min > max {
487          return Ok(Chunk::empty());
488        }
489        if min == 0 && max == 0 {
490          return Ok(Chunk::empty());
491        }
492        let mut out = Vec::new();
493        for _ in 0..min {
494          let v = match &*repr {
495            QueueRepr::Flume(f) => f.recv().await?,
496            QueueRepr::Sliding(s) => s.recv().await?,
497          };
498          out.push(v);
499        }
500        while out.len() < max {
501          match match &*repr {
502            QueueRepr::Flume(f) => f.try_recv(),
503            QueueRepr::Sliding(s) => s.try_recv(),
504          } {
505            Ok(None) => break,
506            Ok(Some(v)) => out.push(v),
507            Err(e) => return Err(e),
508          }
509        }
510        Ok(Chunk::from_vec(out))
511      })
512    })
513  }
514
515  /// Non-blocking receive.
516  pub fn poll(&self) -> Effect<Option<A>, QueueError, ()> {
517    let repr = Arc::clone(&self.repr);
518    Effect::new_async(move |_r| {
519      box_future(async move {
520        tokio::task::yield_now().await;
521        match &*repr {
522          QueueRepr::Flume(f) => f.try_recv(),
523          QueueRepr::Sliding(s) => s.try_recv(),
524        }
525      })
526    })
527  }
528
529  /// Queued element count (approximate for flume).
530  pub fn size(&self) -> Effect<usize, (), ()> {
531    let repr = Arc::clone(&self.repr);
532    Effect::new_async(move |_r| {
533      box_future(async move {
534        match &*repr {
535          QueueRepr::Flume(f) => Ok(f.len()),
536          QueueRepr::Sliding(s) => Ok(s.len().await),
537        }
538      })
539    })
540  }
541
542  /// `true` when there are no queued elements.
543  pub fn is_empty(&self) -> Effect<bool, (), ()> {
544    let repr = Arc::clone(&self.repr);
545    Effect::new_async(move |_r| {
546      box_future(async move {
547        match &*repr {
548          QueueRepr::Flume(f) => Ok(f.is_empty()),
549          QueueRepr::Sliding(s) => Ok(s.is_empty().await),
550        }
551      })
552    })
553  }
554
555  /// `true` when at capacity (bounded / sliding) or shut down (flume side).
556  pub fn is_full(&self) -> Effect<bool, (), ()> {
557    let repr = Arc::clone(&self.repr);
558    Effect::new_async(move |_r| {
559      box_future(async move {
560        match &*repr {
561          QueueRepr::Flume(f) => Ok(f.is_full()),
562          QueueRepr::Sliding(s) => Ok(s.is_full().await),
563        }
564      })
565    })
566  }
567
568  /// Close the queue to producers; buffered values remain available to receivers.
569  pub fn shutdown(&self) -> Effect<(), (), ()> {
570    let repr = Arc::clone(&self.repr);
571    Effect::new_async(move |_r| {
572      box_future(async move {
573        match &*repr {
574          QueueRepr::Flume(f) => f.shutdown().await,
575          QueueRepr::Sliding(s) => s.shutdown().await,
576        }
577        Ok(())
578      })
579    })
580  }
581
582  /// `true` after [`Self::shutdown`] has completed.
583  pub fn is_shutdown(&self) -> Effect<bool, (), ()> {
584    let repr = Arc::clone(&self.repr);
585    Effect::new_async(move |_r| {
586      box_future(async move {
587        match &*repr {
588          QueueRepr::Flume(f) => Ok(f.is_shutdown()),
589          QueueRepr::Sliding(s) => Ok(s.is_shutdown()),
590        }
591      })
592    })
593  }
594
595  /// Completes after [`Self::shutdown`] has been observed.
596  pub fn await_shutdown(&self) -> Effect<(), (), ()> {
597    let repr = Arc::clone(&self.repr);
598    Effect::new_async(move |_r| {
599      box_future(async move {
600        match &*repr {
601          QueueRepr::Flume(f) => f.await_shutdown().await,
602          QueueRepr::Sliding(s) => s.await_shutdown().await,
603        }
604        Ok(())
605      })
606    })
607  }
608}
609
610#[cfg(test)]
611mod tests {
612  use super::*;
613  use crate::runtime::run_async;
614
615  fn drive<A: 'static, E: 'static, R: 'static>(eff: Effect<A, E, R>, env: R) -> Result<A, E> {
616    pollster::block_on(run_async(eff, env))
617  }
618
619  #[test]
620  fn queue_take_suspends_until_offer() {
621    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
622    let q2 = q.clone();
623    let h = std::thread::spawn(move || {
624      std::thread::sleep(std::time::Duration::from_millis(20));
625      drive(q2.offer(7u32), ()).unwrap();
626    });
627    let v = drive(q.take(), ()).unwrap();
628    h.join().unwrap();
629    assert_eq!(v, 7);
630  }
631
632  #[test]
633  fn queue_bounded_offer_returns_false_when_full() {
634    let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
635    assert!(drive(q.offer(1u32), ()).unwrap());
636    assert!(!drive(q.offer(2u32), ()).unwrap());
637    assert_eq!(drive(q.take(), ()).unwrap(), 1);
638    assert!(drive(q.offer(3u32), ()).unwrap());
639  }
640
641  #[test]
642  fn queue_dropping_drops_newest() {
643    let q = drive(Queue::<u32>::dropping(1), ()).unwrap();
644    assert!(drive(q.offer(1u32), ()).unwrap());
645    assert!(!drive(q.offer(2u32), ()).unwrap());
646    assert_eq!(drive(q.size(), ()).unwrap(), 1);
647    assert_eq!(drive(q.take(), ()).unwrap(), 1);
648  }
649
650  #[test]
651  fn queue_sliding_drops_oldest() {
652    let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
653    assert!(drive(q.offer(1u32), ()).unwrap());
654    assert!(drive(q.offer(2u32), ()).unwrap());
655    assert!(drive(q.offer(3u32), ()).unwrap());
656    assert_eq!(drive(q.take(), ()).unwrap(), 2);
657    assert_eq!(drive(q.take(), ()).unwrap(), 3);
658  }
659
660  #[test]
661  fn queue_await_shutdown_returns_after_shutdown() {
662    let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
663    let q2 = q.clone();
664    let h = std::thread::spawn(move || {
665      std::thread::sleep(std::time::Duration::from_millis(15));
666      drive(q2.shutdown(), ()).unwrap();
667    });
668    drive(q.await_shutdown(), ()).unwrap();
669    h.join().unwrap();
670    assert!(drive(q.is_shutdown(), ()).unwrap());
671  }
672
673  #[test]
674  fn queue_offer_all_retains_overflow_bounded() {
675    let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
676    let left = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
677    assert_eq!(left, vec![3, 4]);
678    let chunk = drive(q.take_all(), ()).unwrap();
679    assert_eq!(chunk.into_vec(), vec![1, 2]);
680  }
681
682  #[test]
683  fn queue_take_up_to_and_take_n() {
684    let q = drive(Queue::<u32>::bounded(10), ()).unwrap();
685    drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
686    let c = drive(q.take_up_to(2), ()).unwrap();
687    assert_eq!(c.into_vec(), vec![1, 2]);
688    drive(q.offer_all([4u32, 5]), ()).unwrap();
689    let c2 = drive(q.take_n(2), ()).unwrap();
690    assert_eq!(c2.into_vec(), vec![3, 4]);
691  }
692
693  #[test]
694  fn queue_take_between_min_max_and_edges() {
695    let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
696    assert_eq!(drive(q.take_between(2, 1), ()).unwrap().len(), 0);
697    assert_eq!(drive(q.take_between(0, 0), ()).unwrap().len(), 0);
698    drive(q.offer_all([10u32, 11, 12]), ()).unwrap();
699    let c = drive(q.take_between(2, 3), ()).unwrap();
700    assert_eq!(c.len(), 3);
701  }
702
703  #[test]
704  fn queue_poll_and_is_empty_is_full() {
705    let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
706    assert_eq!(drive(q.poll(), ()).unwrap(), None);
707    assert!(drive(q.is_empty(), ()).unwrap());
708    drive(q.offer(7u32), ()).unwrap();
709    assert!(drive(q.is_full(), ()).unwrap());
710    assert_eq!(drive(q.poll(), ()).unwrap(), Some(7));
711  }
712
713  #[test]
714  fn queue_sliding_is_full_after_capacity() {
715    let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
716    drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
717    assert!(drive(q.is_full(), ()).unwrap());
718  }
719
720  // ── QueueError formatting ─────────────────────────────────────────────────
721
722  #[test]
723  fn queue_error_display_and_debug() {
724    let e = QueueError::Disconnected;
725    assert_eq!(format!("{e}"), "queue is disconnected");
726    assert!(format!("{e:?}").contains("Disconnected"));
727    // std::error::Error is implemented (source returns None)
728    use std::error::Error;
729    assert!(e.source().is_none());
730  }
731
732  // ── take_n / take_up_to edge: n == 0 ────────────────────────────────────
733
734  #[test]
735  fn queue_take_n_zero_returns_empty_chunk() {
736    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
737    drive(q.offer(1u32), ()).unwrap();
738    let c = drive(q.take_n(0), ()).unwrap();
739    assert_eq!(c.len(), 0);
740    // Item stays in queue
741    assert_eq!(drive(q.size(), ()).unwrap(), 1);
742  }
743
744  #[test]
745  fn queue_take_up_to_zero_returns_empty_chunk() {
746    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
747    drive(q.offer(1u32), ()).unwrap();
748    let c = drive(q.take_up_to(0), ()).unwrap();
749    assert_eq!(c.len(), 0);
750    assert_eq!(drive(q.size(), ()).unwrap(), 1);
751  }
752
753  // ── disconnected / shutdown error paths ────────────────────────────────
754
755  #[test]
756  fn queue_take_returns_err_when_shut_down_empty() {
757    let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
758    drive(q.shutdown(), ()).unwrap();
759    assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
760  }
761
762  #[test]
763  fn queue_take_all_returns_err_when_shut_down_empty() {
764    let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
765    drive(q.shutdown(), ()).unwrap();
766    assert_eq!(drive(q.take_all(), ()), Err(QueueError::Disconnected));
767  }
768
769  #[test]
770  fn queue_take_up_to_returns_err_when_shut_down_empty() {
771    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
772    drive(q.shutdown(), ()).unwrap();
773    assert_eq!(drive(q.take_up_to(3), ()), Err(QueueError::Disconnected));
774  }
775
776  #[test]
777  fn queue_take_n_returns_err_when_shut_down_empty() {
778    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
779    drive(q.shutdown(), ()).unwrap();
780    assert_eq!(drive(q.take_n(2), ()), Err(QueueError::Disconnected));
781  }
782
783  #[test]
784  fn queue_poll_returns_err_when_shut_down_empty() {
785    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
786    drive(q.shutdown(), ()).unwrap();
787    assert_eq!(drive(q.poll(), ()), Err(QueueError::Disconnected));
788  }
789
790  #[test]
791  fn queue_take_between_returns_err_when_shut_down_empty() {
792    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
793    drive(q.shutdown(), ()).unwrap();
794    assert_eq!(
795      drive(q.take_between(1, 3), ()),
796      Err(QueueError::Disconnected)
797    );
798  }
799
800  // ── offer after shutdown ─────────────────────────────────────────────────
801
802  #[test]
803  fn queue_offer_after_shutdown_returns_false() {
804    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
805    drive(q.shutdown(), ()).unwrap();
806    assert!(!drive(q.offer(99u32), ()).unwrap());
807  }
808
809  #[test]
810  fn queue_offer_all_after_shutdown_silently_drops_items() {
811    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
812    drive(q.shutdown(), ()).unwrap();
813    // offer_or_retain returns Ok(()) when sender is gone → items are silently dropped
814    let retained = drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
815    assert!(
816      retained.is_empty(),
817      "items offered after shutdown are silently dropped, not retained"
818    );
819  }
820
821  // ── size / is_full / is_empty on all variants ────────────────────────────
822
823  #[test]
824  fn queue_unbounded_is_never_full() {
825    let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
826    for i in 0u32..100 {
827      drive(q.offer(i), ()).unwrap();
828    }
829    assert!(!drive(q.is_full(), ()).unwrap());
830    assert_eq!(drive(q.size(), ()).unwrap(), 100);
831    assert!(!drive(q.is_empty(), ()).unwrap());
832  }
833
834  #[test]
835  fn queue_dropping_size_and_fullness() {
836    let q = drive(Queue::<u32>::dropping(3), ()).unwrap();
837    assert!(drive(q.is_empty(), ()).unwrap());
838    drive(q.offer_all([10u32, 20, 30, 40]), ()).unwrap();
839    assert_eq!(drive(q.size(), ()).unwrap(), 3);
840    assert!(drive(q.is_full(), ()).unwrap());
841  }
842
843  #[test]
844  fn queue_sliding_size_and_fullness() {
845    let q = drive(Queue::<u32>::sliding(3), ()).unwrap();
846    assert!(drive(q.is_empty(), ()).unwrap());
847    drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
848    // Sliding evicts oldest: queue holds [2, 3, 4]
849    assert_eq!(drive(q.size(), ()).unwrap(), 3);
850    assert!(drive(q.is_full(), ()).unwrap());
851    assert!(!drive(q.is_empty(), ()).unwrap());
852  }
853
854  // ── offer_all on sliding ─────────────────────────────────────────────────
855
856  #[test]
857  fn queue_offer_all_on_sliding_always_accepts() {
858    let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
859    // offer_all on sliding returns empty vec (never retains)
860    let retained = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
861    assert!(retained.is_empty(), "sliding should not retain items");
862    // Latest 2 items remain
863    let c = drive(q.take_all(), ()).unwrap();
864    assert_eq!(c.into_vec(), vec![3, 4]);
865  }
866
867  // ── is_shutdown state ────────────────────────────────────────────────────
868
869  #[test]
870  fn queue_is_shutdown_before_and_after() {
871    let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
872    assert!(!drive(q.is_shutdown(), ()).unwrap());
873    drive(q.shutdown(), ()).unwrap();
874    assert!(drive(q.is_shutdown(), ()).unwrap());
875  }
876
877  // ── take drains buffered values before disconnecting ────────────────────
878
879  #[test]
880  fn queue_take_drains_buffer_then_errors_after_shutdown() {
881    let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
882    drive(q.offer(42u32), ()).unwrap();
883    drive(q.shutdown(), ()).unwrap();
884    // Buffered item is still readable via `take`
885    assert_eq!(drive(q.take(), ()).unwrap(), 42);
886    // After draining, disconnected
887    assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
888  }
889}