1use std::collections::VecDeque;
7use std::fmt;
8use std::sync::Arc;
9
10use tokio::sync::watch;
11use tokio::sync::{Mutex, Notify};
12
13use crate::Chunk;
14use crate::kernel::{Effect, box_future, succeed};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum QueueError {
19 Disconnected,
21}
22
23impl fmt::Display for QueueError {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 QueueError::Disconnected => write!(f, "queue is disconnected"),
27 }
28 }
29}
30
31impl std::error::Error for QueueError {}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34enum FlumeKind {
35 Bounded,
36 Unbounded,
37 Dropping,
38}
39
40struct FlumeShared<A: Send> {
41 tx: Mutex<Option<flume::Sender<A>>>,
42 rx: Mutex<Option<flume::Receiver<A>>>,
43 kind: FlumeKind,
44 shutdown: watch::Sender<bool>,
45}
46
47impl<A: Send + 'static> FlumeShared<A> {
48 fn new(
49 kind: FlumeKind,
50 tx: flume::Sender<A>,
51 rx: flume::Receiver<A>,
52 shutdown: watch::Sender<bool>,
53 ) -> Self {
54 Self {
55 tx: Mutex::new(Some(tx)),
56 rx: Mutex::new(Some(rx)),
57 kind,
58 shutdown,
59 }
60 }
61
62 async fn offer(&self, value: A) -> bool {
63 let mut guard = self.tx.lock().await;
64 let Some(tx) = guard.as_mut() else {
65 return false;
66 };
67 match self.kind {
68 FlumeKind::Bounded => match tx.try_send(value) {
69 Ok(()) => true,
70 Err(flume::TrySendError::Full(_)) => false,
71 Err(flume::TrySendError::Disconnected(_)) => false,
72 },
73 FlumeKind::Dropping => match tx.try_send(value) {
74 Ok(()) => true,
75 Err(flume::TrySendError::Full(v)) => {
76 drop(v);
77 false
78 }
79 Err(flume::TrySendError::Disconnected(_)) => false,
80 },
81 FlumeKind::Unbounded => match tx.try_send(value) {
82 Ok(()) => true,
83 Err(flume::TrySendError::Full(v)) => {
84 drop(v);
85 false
86 }
87 Err(flume::TrySendError::Disconnected(_)) => false,
88 },
89 }
90 }
91
92 async fn recv(&self) -> Result<A, QueueError> {
93 let rx = {
94 let guard = self.rx.lock().await;
95 guard.as_ref().map(flume::Receiver::clone)
96 };
97 let Some(rx) = rx else {
98 return Err(QueueError::Disconnected);
99 };
100 match rx.recv_async().await {
101 Ok(v) => Ok(v),
102 Err(_) => Err(QueueError::Disconnected),
103 }
104 }
105
106 fn try_recv(&self) -> Result<Option<A>, QueueError> {
107 let guard = self.rx.blocking_lock();
108 let Some(rx) = guard.as_ref() else {
109 return Err(QueueError::Disconnected);
110 };
111 match rx.try_recv() {
112 Ok(v) => Ok(Some(v)),
113 Err(flume::TryRecvError::Empty) => Ok(None),
114 Err(flume::TryRecvError::Disconnected) => Err(QueueError::Disconnected),
115 }
116 }
117
118 async fn offer_or_retain(&self, value: A) -> Result<(), A> {
121 let mut guard = self.tx.lock().await;
122 let Some(tx) = guard.as_mut() else {
123 return Ok(());
124 };
125 match self.kind {
126 FlumeKind::Bounded => match tx.try_send(value) {
127 Ok(()) => Ok(()),
128 Err(flume::TrySendError::Full(v)) => Err(v),
129 Err(flume::TrySendError::Disconnected(v)) => {
130 drop(v);
131 Ok(())
132 }
133 },
134 FlumeKind::Unbounded | FlumeKind::Dropping => match tx.try_send(value) {
135 Ok(()) => Ok(()),
136 Err(flume::TrySendError::Full(v)) => {
137 drop(v);
138 Ok(())
139 }
140 Err(flume::TrySendError::Disconnected(v)) => {
141 drop(v);
142 Ok(())
143 }
144 },
145 }
146 }
147
148 fn len(&self) -> usize {
149 let guard = self.rx.blocking_lock();
150 guard.as_ref().map(flume::Receiver::len).unwrap_or(0)
151 }
152
153 fn is_empty(&self) -> bool {
154 self.len() == 0
155 }
156
157 fn is_full(&self) -> bool {
158 let tx_guard = self.tx.blocking_lock();
159 let Some(tx) = tx_guard.as_ref() else {
160 return true;
161 };
162 tx.is_full()
163 }
164
165 async fn shutdown(&self) {
166 let mut guard = self.tx.lock().await;
167 guard.take();
168 self.shutdown.send_replace(true);
170 }
171
172 fn is_shutdown(&self) -> bool {
173 *self.shutdown.borrow()
174 }
175
176 async fn await_shutdown(&self) {
177 if *self.shutdown.borrow() {
178 return;
179 }
180 let mut rx = self.shutdown.subscribe();
181 let _ = rx.changed().await;
182 }
183}
184
185struct SlidingState<A> {
186 deque: VecDeque<A>,
187 open: bool,
188}
189
190struct SlidingShared<A: Send> {
191 state: Mutex<SlidingState<A>>,
192 capacity: usize,
193 not_empty: Notify,
194 shutdown: watch::Sender<bool>,
195}
196
197impl<A: Send + 'static> SlidingShared<A> {
198 fn new(capacity: usize) -> Self {
199 Self {
200 state: Mutex::new(SlidingState {
201 deque: VecDeque::new(),
202 open: true,
203 }),
204 capacity: capacity.max(1),
205 not_empty: Notify::new(),
206 shutdown: watch::channel(false).0,
207 }
208 }
209
210 async fn offer(&self, value: A) -> bool {
211 self.offer_or_retain(value).await.is_ok()
212 }
213
214 async fn offer_or_retain(&self, value: A) -> Result<(), A> {
217 let mut g = self.state.lock().await;
218 if !g.open {
219 return Err(value);
220 }
221 g.deque.push_back(value);
222 while g.deque.len() > self.capacity {
223 g.deque.pop_front();
224 }
225 drop(g);
226 self.not_empty.notify_waiters();
227 Ok(())
228 }
229
230 async fn recv(&self) -> Result<A, QueueError> {
231 loop {
232 let maybe = {
233 let mut g = self.state.lock().await;
234 if let Some(v) = g.deque.pop_front() {
235 Some(v)
236 } else if !g.open {
237 return Err(QueueError::Disconnected);
238 } else {
239 None
240 }
241 };
242 if let Some(v) = maybe {
243 return Ok(v);
244 }
245 self.not_empty.notified().await;
246 }
247 }
248
249 fn try_recv(&self) -> Result<Option<A>, QueueError> {
250 let mut g = self.state.blocking_lock();
251 if let Some(v) = g.deque.pop_front() {
252 return Ok(Some(v));
253 }
254 if !g.open {
255 return Err(QueueError::Disconnected);
256 }
257 Ok(None)
258 }
259
260 async fn len(&self) -> usize {
261 self.state.lock().await.deque.len()
262 }
263
264 async fn is_empty(&self) -> bool {
265 self.state.lock().await.deque.is_empty()
266 }
267
268 async fn is_full(&self) -> bool {
269 self.state.lock().await.deque.len() >= self.capacity
270 }
271
272 async fn shutdown(&self) {
273 let mut g = self.state.lock().await;
274 g.open = false;
275 drop(g);
276 self.shutdown.send_replace(true);
278 self.not_empty.notify_waiters();
279 }
280
281 fn is_shutdown(&self) -> bool {
282 *self.shutdown.borrow()
283 }
284
285 async fn await_shutdown(&self) {
286 if *self.shutdown.borrow() {
287 return;
288 }
289 let mut rx = self.shutdown.subscribe();
290 let _ = rx.changed().await;
291 }
292}
293
294enum QueueRepr<A: Send + 'static> {
295 Flume(Arc<FlumeShared<A>>),
296 Sliding(Arc<SlidingShared<A>>),
297}
298
299pub struct Queue<A: Send + 'static> {
301 repr: Arc<QueueRepr<A>>,
302}
303
304impl<A: Send + 'static> Clone for Queue<A> {
305 fn clone(&self) -> Self {
306 Self {
307 repr: Arc::clone(&self.repr),
308 }
309 }
310}
311
312impl<A: Send + 'static> Queue<A> {
313 fn from_flume(kind: FlumeKind, tx: flume::Sender<A>, rx: flume::Receiver<A>) -> Self {
314 let shutdown = watch::channel(false).0;
315 Self {
316 repr: Arc::new(QueueRepr::Flume(Arc::new(FlumeShared::new(
317 kind, tx, rx, shutdown,
318 )))),
319 }
320 }
321
322 fn from_sliding(inner: Arc<SlidingShared<A>>) -> Self {
323 Self {
324 repr: Arc::new(QueueRepr::Sliding(inner)),
325 }
326 }
327
328 pub fn bounded(capacity: usize) -> Effect<Queue<A>, (), ()> {
330 let cap = capacity.max(1);
331 let (tx, rx) = flume::bounded(cap);
332 succeed(Self::from_flume(FlumeKind::Bounded, tx, rx))
333 }
334
335 pub fn unbounded() -> Effect<Queue<A>, (), ()> {
337 let (tx, rx) = flume::unbounded();
338 succeed(Self::from_flume(FlumeKind::Unbounded, tx, rx))
339 }
340
341 pub fn dropping(capacity: usize) -> Effect<Queue<A>, (), ()> {
343 let cap = capacity.max(1);
344 let (tx, rx) = flume::bounded(cap);
345 succeed(Self::from_flume(FlumeKind::Dropping, tx, rx))
346 }
347
348 pub fn sliding(capacity: usize) -> Effect<Queue<A>, (), ()> {
350 let inner = Arc::new(SlidingShared::new(capacity));
351 succeed(Self::from_sliding(inner))
352 }
353
354 pub fn offer(&self, value: A) -> Effect<bool, (), ()> {
356 let repr = Arc::clone(&self.repr);
357 Effect::new_async(move |_r| {
358 box_future(async move {
359 match &*repr {
360 QueueRepr::Flume(f) => Ok(f.offer(value).await),
361 QueueRepr::Sliding(s) => Ok(s.offer(value).await),
362 }
363 })
364 })
365 }
366
367 pub fn offer_all<I>(&self, iter: I) -> Effect<Vec<A>, (), ()>
369 where
370 I: IntoIterator<Item = A> + 'static,
371 I::IntoIter: Send + 'static,
372 {
373 let repr = Arc::clone(&self.repr);
374 Effect::new_async(move |_r| {
375 box_future(async move {
376 let mut left = Vec::new();
377 for v in iter {
378 match &*repr {
379 QueueRepr::Flume(f) => match f.offer_or_retain(v).await {
380 Ok(()) => {}
381 Err(v) => left.push(v),
382 },
383 QueueRepr::Sliding(s) => match s.offer_or_retain(v).await {
384 Ok(()) => {}
385 Err(v) => left.push(v),
386 },
387 }
388 }
389 Ok(left)
390 })
391 })
392 }
393
394 pub fn take(&self) -> Effect<A, QueueError, ()> {
396 let repr = Arc::clone(&self.repr);
397 Effect::new_async(move |_r| {
398 box_future(async move {
399 match &*repr {
400 QueueRepr::Flume(f) => f.recv().await,
401 QueueRepr::Sliding(s) => s.recv().await,
402 }
403 })
404 })
405 }
406
407 pub fn take_all(&self) -> Effect<Chunk<A>, QueueError, ()> {
409 let repr = Arc::clone(&self.repr);
410 Effect::new_async(move |_r| {
411 box_future(async move {
412 let first = match &*repr {
413 QueueRepr::Flume(f) => f.recv().await?,
414 QueueRepr::Sliding(s) => s.recv().await?,
415 };
416 let mut out = vec![first];
417 loop {
418 match match &*repr {
419 QueueRepr::Flume(f) => f.try_recv(),
420 QueueRepr::Sliding(s) => s.try_recv(),
421 } {
422 Ok(None) => break,
423 Ok(Some(v)) => out.push(v),
424 Err(e) => return Err(e),
425 }
426 }
427 Ok(Chunk::from_vec(out))
428 })
429 })
430 }
431
432 pub fn take_up_to(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
434 let repr = Arc::clone(&self.repr);
435 Effect::new_async(move |_r| {
436 box_future(async move {
437 if n == 0 {
438 return Ok(Chunk::empty());
439 }
440 let first = match &*repr {
441 QueueRepr::Flume(f) => f.recv().await?,
442 QueueRepr::Sliding(s) => s.recv().await?,
443 };
444 let mut out = vec![first];
445 while out.len() < n {
446 match match &*repr {
447 QueueRepr::Flume(f) => f.try_recv(),
448 QueueRepr::Sliding(s) => s.try_recv(),
449 } {
450 Ok(None) => break,
451 Ok(Some(v)) => out.push(v),
452 Err(e) => return Err(e),
453 }
454 }
455 Ok(Chunk::from_vec(out))
456 })
457 })
458 }
459
460 pub fn take_n(&self, n: usize) -> Effect<Chunk<A>, QueueError, ()> {
462 let repr = Arc::clone(&self.repr);
463 Effect::new_async(move |_r| {
464 box_future(async move {
465 if n == 0 {
466 return Ok(Chunk::empty());
467 }
468 let mut out = Vec::with_capacity(n);
469 for _ in 0..n {
470 let v = match &*repr {
471 QueueRepr::Flume(f) => f.recv().await?,
472 QueueRepr::Sliding(s) => s.recv().await?,
473 };
474 out.push(v);
475 }
476 Ok(Chunk::from_vec(out))
477 })
478 })
479 }
480
481 pub fn take_between(&self, min: usize, max: usize) -> Effect<Chunk<A>, QueueError, ()> {
483 let repr = Arc::clone(&self.repr);
484 Effect::new_async(move |_r| {
485 box_future(async move {
486 if min > max {
487 return Ok(Chunk::empty());
488 }
489 if min == 0 && max == 0 {
490 return Ok(Chunk::empty());
491 }
492 let mut out = Vec::new();
493 for _ in 0..min {
494 let v = match &*repr {
495 QueueRepr::Flume(f) => f.recv().await?,
496 QueueRepr::Sliding(s) => s.recv().await?,
497 };
498 out.push(v);
499 }
500 while out.len() < max {
501 match match &*repr {
502 QueueRepr::Flume(f) => f.try_recv(),
503 QueueRepr::Sliding(s) => s.try_recv(),
504 } {
505 Ok(None) => break,
506 Ok(Some(v)) => out.push(v),
507 Err(e) => return Err(e),
508 }
509 }
510 Ok(Chunk::from_vec(out))
511 })
512 })
513 }
514
515 pub fn poll(&self) -> Effect<Option<A>, QueueError, ()> {
517 let repr = Arc::clone(&self.repr);
518 Effect::new_async(move |_r| {
519 box_future(async move {
520 tokio::task::yield_now().await;
521 match &*repr {
522 QueueRepr::Flume(f) => f.try_recv(),
523 QueueRepr::Sliding(s) => s.try_recv(),
524 }
525 })
526 })
527 }
528
529 pub fn size(&self) -> Effect<usize, (), ()> {
531 let repr = Arc::clone(&self.repr);
532 Effect::new_async(move |_r| {
533 box_future(async move {
534 match &*repr {
535 QueueRepr::Flume(f) => Ok(f.len()),
536 QueueRepr::Sliding(s) => Ok(s.len().await),
537 }
538 })
539 })
540 }
541
542 pub fn is_empty(&self) -> Effect<bool, (), ()> {
544 let repr = Arc::clone(&self.repr);
545 Effect::new_async(move |_r| {
546 box_future(async move {
547 match &*repr {
548 QueueRepr::Flume(f) => Ok(f.is_empty()),
549 QueueRepr::Sliding(s) => Ok(s.is_empty().await),
550 }
551 })
552 })
553 }
554
555 pub fn is_full(&self) -> Effect<bool, (), ()> {
557 let repr = Arc::clone(&self.repr);
558 Effect::new_async(move |_r| {
559 box_future(async move {
560 match &*repr {
561 QueueRepr::Flume(f) => Ok(f.is_full()),
562 QueueRepr::Sliding(s) => Ok(s.is_full().await),
563 }
564 })
565 })
566 }
567
568 pub fn shutdown(&self) -> Effect<(), (), ()> {
570 let repr = Arc::clone(&self.repr);
571 Effect::new_async(move |_r| {
572 box_future(async move {
573 match &*repr {
574 QueueRepr::Flume(f) => f.shutdown().await,
575 QueueRepr::Sliding(s) => s.shutdown().await,
576 }
577 Ok(())
578 })
579 })
580 }
581
582 pub fn is_shutdown(&self) -> Effect<bool, (), ()> {
584 let repr = Arc::clone(&self.repr);
585 Effect::new_async(move |_r| {
586 box_future(async move {
587 match &*repr {
588 QueueRepr::Flume(f) => Ok(f.is_shutdown()),
589 QueueRepr::Sliding(s) => Ok(s.is_shutdown()),
590 }
591 })
592 })
593 }
594
595 pub fn await_shutdown(&self) -> Effect<(), (), ()> {
597 let repr = Arc::clone(&self.repr);
598 Effect::new_async(move |_r| {
599 box_future(async move {
600 match &*repr {
601 QueueRepr::Flume(f) => f.await_shutdown().await,
602 QueueRepr::Sliding(s) => s.await_shutdown().await,
603 }
604 Ok(())
605 })
606 })
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613 use crate::runtime::run_async;
614
615 fn drive<A: 'static, E: 'static, R: 'static>(eff: Effect<A, E, R>, env: R) -> Result<A, E> {
616 pollster::block_on(run_async(eff, env))
617 }
618
619 #[test]
620 fn queue_take_suspends_until_offer() {
621 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
622 let q2 = q.clone();
623 let h = std::thread::spawn(move || {
624 std::thread::sleep(std::time::Duration::from_millis(20));
625 drive(q2.offer(7u32), ()).unwrap();
626 });
627 let v = drive(q.take(), ()).unwrap();
628 h.join().unwrap();
629 assert_eq!(v, 7);
630 }
631
632 #[test]
633 fn queue_bounded_offer_returns_false_when_full() {
634 let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
635 assert!(drive(q.offer(1u32), ()).unwrap());
636 assert!(!drive(q.offer(2u32), ()).unwrap());
637 assert_eq!(drive(q.take(), ()).unwrap(), 1);
638 assert!(drive(q.offer(3u32), ()).unwrap());
639 }
640
641 #[test]
642 fn queue_dropping_drops_newest() {
643 let q = drive(Queue::<u32>::dropping(1), ()).unwrap();
644 assert!(drive(q.offer(1u32), ()).unwrap());
645 assert!(!drive(q.offer(2u32), ()).unwrap());
646 assert_eq!(drive(q.size(), ()).unwrap(), 1);
647 assert_eq!(drive(q.take(), ()).unwrap(), 1);
648 }
649
650 #[test]
651 fn queue_sliding_drops_oldest() {
652 let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
653 assert!(drive(q.offer(1u32), ()).unwrap());
654 assert!(drive(q.offer(2u32), ()).unwrap());
655 assert!(drive(q.offer(3u32), ()).unwrap());
656 assert_eq!(drive(q.take(), ()).unwrap(), 2);
657 assert_eq!(drive(q.take(), ()).unwrap(), 3);
658 }
659
660 #[test]
661 fn queue_await_shutdown_returns_after_shutdown() {
662 let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
663 let q2 = q.clone();
664 let h = std::thread::spawn(move || {
665 std::thread::sleep(std::time::Duration::from_millis(15));
666 drive(q2.shutdown(), ()).unwrap();
667 });
668 drive(q.await_shutdown(), ()).unwrap();
669 h.join().unwrap();
670 assert!(drive(q.is_shutdown(), ()).unwrap());
671 }
672
673 #[test]
674 fn queue_offer_all_retains_overflow_bounded() {
675 let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
676 let left = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
677 assert_eq!(left, vec![3, 4]);
678 let chunk = drive(q.take_all(), ()).unwrap();
679 assert_eq!(chunk.into_vec(), vec![1, 2]);
680 }
681
682 #[test]
683 fn queue_take_up_to_and_take_n() {
684 let q = drive(Queue::<u32>::bounded(10), ()).unwrap();
685 drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
686 let c = drive(q.take_up_to(2), ()).unwrap();
687 assert_eq!(c.into_vec(), vec![1, 2]);
688 drive(q.offer_all([4u32, 5]), ()).unwrap();
689 let c2 = drive(q.take_n(2), ()).unwrap();
690 assert_eq!(c2.into_vec(), vec![3, 4]);
691 }
692
693 #[test]
694 fn queue_take_between_min_max_and_edges() {
695 let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
696 assert_eq!(drive(q.take_between(2, 1), ()).unwrap().len(), 0);
697 assert_eq!(drive(q.take_between(0, 0), ()).unwrap().len(), 0);
698 drive(q.offer_all([10u32, 11, 12]), ()).unwrap();
699 let c = drive(q.take_between(2, 3), ()).unwrap();
700 assert_eq!(c.len(), 3);
701 }
702
703 #[test]
704 fn queue_poll_and_is_empty_is_full() {
705 let q = drive(Queue::<u32>::bounded(1), ()).unwrap();
706 assert_eq!(drive(q.poll(), ()).unwrap(), None);
707 assert!(drive(q.is_empty(), ()).unwrap());
708 drive(q.offer(7u32), ()).unwrap();
709 assert!(drive(q.is_full(), ()).unwrap());
710 assert_eq!(drive(q.poll(), ()).unwrap(), Some(7));
711 }
712
713 #[test]
714 fn queue_sliding_is_full_after_capacity() {
715 let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
716 drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
717 assert!(drive(q.is_full(), ()).unwrap());
718 }
719
720 #[test]
723 fn queue_error_display_and_debug() {
724 let e = QueueError::Disconnected;
725 assert_eq!(format!("{e}"), "queue is disconnected");
726 assert!(format!("{e:?}").contains("Disconnected"));
727 use std::error::Error;
729 assert!(e.source().is_none());
730 }
731
732 #[test]
735 fn queue_take_n_zero_returns_empty_chunk() {
736 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
737 drive(q.offer(1u32), ()).unwrap();
738 let c = drive(q.take_n(0), ()).unwrap();
739 assert_eq!(c.len(), 0);
740 assert_eq!(drive(q.size(), ()).unwrap(), 1);
742 }
743
744 #[test]
745 fn queue_take_up_to_zero_returns_empty_chunk() {
746 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
747 drive(q.offer(1u32), ()).unwrap();
748 let c = drive(q.take_up_to(0), ()).unwrap();
749 assert_eq!(c.len(), 0);
750 assert_eq!(drive(q.size(), ()).unwrap(), 1);
751 }
752
753 #[test]
756 fn queue_take_returns_err_when_shut_down_empty() {
757 let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
758 drive(q.shutdown(), ()).unwrap();
759 assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
760 }
761
762 #[test]
763 fn queue_take_all_returns_err_when_shut_down_empty() {
764 let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
765 drive(q.shutdown(), ()).unwrap();
766 assert_eq!(drive(q.take_all(), ()), Err(QueueError::Disconnected));
767 }
768
769 #[test]
770 fn queue_take_up_to_returns_err_when_shut_down_empty() {
771 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
772 drive(q.shutdown(), ()).unwrap();
773 assert_eq!(drive(q.take_up_to(3), ()), Err(QueueError::Disconnected));
774 }
775
776 #[test]
777 fn queue_take_n_returns_err_when_shut_down_empty() {
778 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
779 drive(q.shutdown(), ()).unwrap();
780 assert_eq!(drive(q.take_n(2), ()), Err(QueueError::Disconnected));
781 }
782
783 #[test]
784 fn queue_poll_returns_err_when_shut_down_empty() {
785 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
786 drive(q.shutdown(), ()).unwrap();
787 assert_eq!(drive(q.poll(), ()), Err(QueueError::Disconnected));
788 }
789
790 #[test]
791 fn queue_take_between_returns_err_when_shut_down_empty() {
792 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
793 drive(q.shutdown(), ()).unwrap();
794 assert_eq!(
795 drive(q.take_between(1, 3), ()),
796 Err(QueueError::Disconnected)
797 );
798 }
799
800 #[test]
803 fn queue_offer_after_shutdown_returns_false() {
804 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
805 drive(q.shutdown(), ()).unwrap();
806 assert!(!drive(q.offer(99u32), ()).unwrap());
807 }
808
809 #[test]
810 fn queue_offer_all_after_shutdown_silently_drops_items() {
811 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
812 drive(q.shutdown(), ()).unwrap();
813 let retained = drive(q.offer_all([1u32, 2, 3]), ()).unwrap();
815 assert!(
816 retained.is_empty(),
817 "items offered after shutdown are silently dropped, not retained"
818 );
819 }
820
821 #[test]
824 fn queue_unbounded_is_never_full() {
825 let q = drive(Queue::<u32>::unbounded(), ()).unwrap();
826 for i in 0u32..100 {
827 drive(q.offer(i), ()).unwrap();
828 }
829 assert!(!drive(q.is_full(), ()).unwrap());
830 assert_eq!(drive(q.size(), ()).unwrap(), 100);
831 assert!(!drive(q.is_empty(), ()).unwrap());
832 }
833
834 #[test]
835 fn queue_dropping_size_and_fullness() {
836 let q = drive(Queue::<u32>::dropping(3), ()).unwrap();
837 assert!(drive(q.is_empty(), ()).unwrap());
838 drive(q.offer_all([10u32, 20, 30, 40]), ()).unwrap();
839 assert_eq!(drive(q.size(), ()).unwrap(), 3);
840 assert!(drive(q.is_full(), ()).unwrap());
841 }
842
843 #[test]
844 fn queue_sliding_size_and_fullness() {
845 let q = drive(Queue::<u32>::sliding(3), ()).unwrap();
846 assert!(drive(q.is_empty(), ()).unwrap());
847 drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
848 assert_eq!(drive(q.size(), ()).unwrap(), 3);
850 assert!(drive(q.is_full(), ()).unwrap());
851 assert!(!drive(q.is_empty(), ()).unwrap());
852 }
853
854 #[test]
857 fn queue_offer_all_on_sliding_always_accepts() {
858 let q = drive(Queue::<u32>::sliding(2), ()).unwrap();
859 let retained = drive(q.offer_all([1u32, 2, 3, 4]), ()).unwrap();
861 assert!(retained.is_empty(), "sliding should not retain items");
862 let c = drive(q.take_all(), ()).unwrap();
864 assert_eq!(c.into_vec(), vec![3, 4]);
865 }
866
867 #[test]
870 fn queue_is_shutdown_before_and_after() {
871 let q = drive(Queue::<u32>::bounded(2), ()).unwrap();
872 assert!(!drive(q.is_shutdown(), ()).unwrap());
873 drive(q.shutdown(), ()).unwrap();
874 assert!(drive(q.is_shutdown(), ()).unwrap());
875 }
876
877 #[test]
880 fn queue_take_drains_buffer_then_errors_after_shutdown() {
881 let q = drive(Queue::<u32>::bounded(4), ()).unwrap();
882 drive(q.offer(42u32), ()).unwrap();
883 drive(q.shutdown(), ()).unwrap();
884 assert_eq!(drive(q.take(), ()).unwrap(), 42);
886 assert_eq!(drive(q.take(), ()), Err(QueueError::Disconnected));
888 }
889}