1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU8, Ordering, fence};
8use std::sync::{Arc, Condvar, Mutex, OnceLock};
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12
13use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
14use crate::{StreamError, StreamResult};
15use futures::channel::oneshot;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum QueueOfferResult {
20 Enqueued,
22 Dropped,
24 QueueClosed,
26 Failure(StreamError),
28}
29
30#[derive(Clone)]
31enum TerminalSignal {
32 Complete,
33 Error(StreamError),
34}
35
36const TERM_NONE: u8 = 0;
49const TERM_COMPLETE: u8 = 1;
50const TERM_ERROR: u8 = 2;
51
52const PARK_BACKSTOP: Duration = Duration::from_millis(10);
57
58struct BoundedQueueShared<T> {
59 buffer: ArrayQueue<T>,
60 terminal: AtomicU8,
62 error: OnceLock<StreamError>,
64 park: Mutex<()>,
66 available: Condvar,
67 parked: AtomicBool,
69}
70
71impl<T> BoundedQueueShared<T> {
72 fn new(capacity: usize) -> Arc<Self> {
73 Arc::new(Self {
74 buffer: ArrayQueue::new(capacity),
75 terminal: AtomicU8::new(TERM_NONE),
76 error: OnceLock::new(),
77 park: Mutex::new(()),
78 available: Condvar::new(),
79 parked: AtomicBool::new(false),
80 })
81 }
82
83 fn set_complete(&self) {
85 let _ = self.terminal.compare_exchange(
86 TERM_NONE,
87 TERM_COMPLETE,
88 Ordering::AcqRel,
89 Ordering::Relaxed,
90 );
91 self.wake_consumer();
92 }
93
94 fn set_error(&self, error: StreamError) {
96 let _ = self.error.set(error);
99 let _ = self.terminal.compare_exchange(
100 TERM_NONE,
101 TERM_ERROR,
102 Ordering::AcqRel,
103 Ordering::Relaxed,
104 );
105 self.wake_consumer();
106 }
107
108 fn wake_consumer(&self) {
112 fence(Ordering::SeqCst);
117 if self.parked.load(Ordering::Relaxed) {
118 let _guard = self
119 .park
120 .lock()
121 .unwrap_or_else(|poison| poison.into_inner());
122 self.available.notify_one();
123 }
124 }
125}
126
127#[derive(Clone)]
131pub struct BoundedSourceQueue<T> {
132 shared: Arc<BoundedQueueShared<T>>,
133}
134
135impl<T> BoundedSourceQueue<T> {
136 pub fn offer(&self, elem: T) -> QueueOfferResult {
140 if self.shared.terminal.load(Ordering::Acquire) != TERM_NONE {
141 return QueueOfferResult::QueueClosed;
142 }
143 match self.shared.buffer.push(elem) {
144 Ok(()) => {
145 self.shared.wake_consumer();
146 QueueOfferResult::Enqueued
147 }
148 Err(_full) => QueueOfferResult::Dropped,
149 }
150 }
151
152 pub fn complete(&self) {
154 self.shared.set_complete();
155 }
156
157 pub fn fail(&self, error: StreamError) {
159 self.shared.set_error(error);
160 }
161}
162
163impl<T> Drop for BoundedSourceQueue<T> {
164 fn drop(&mut self) {
165 if Arc::strong_count(&self.shared) != 2 {
168 return;
169 }
170 self.shared.set_complete();
171 }
172}
173
174struct BoundedQueueStream<T> {
175 shared: Arc<BoundedQueueShared<T>>,
176}
177
178impl<T> Iterator for BoundedQueueStream<T> {
179 type Item = StreamResult<T>;
180
181 fn next(&mut self) -> Option<Self::Item> {
182 let shared = &*self.shared;
183 loop {
184 if let Some(item) = shared.buffer.pop() {
186 return Some(Ok(item));
187 }
188 match shared.terminal.load(Ordering::Acquire) {
189 TERM_COMPLETE => {
190 if let Some(item) = shared.buffer.pop() {
195 return Some(Ok(item));
196 }
197 return None;
198 }
199 TERM_ERROR => {
200 if let Some(item) = shared.buffer.pop() {
201 return Some(Ok(item));
202 }
203 let error = shared
204 .error
205 .get()
206 .cloned()
207 .unwrap_or_else(|| StreamError::Failed("queue failed".into()));
208 return Some(Err(error));
209 }
210 _ => {}
211 }
212
213 let guard = shared
215 .park
216 .lock()
217 .unwrap_or_else(|poison| poison.into_inner());
218 shared.parked.store(true, Ordering::Relaxed);
219 fence(Ordering::SeqCst);
223 if !shared.buffer.is_empty() || shared.terminal.load(Ordering::Acquire) != TERM_NONE {
224 shared.parked.store(false, Ordering::Relaxed);
225 drop(guard);
226 continue;
227 }
228 let (guard, _timeout) = shared
229 .available
230 .wait_timeout(guard, PARK_BACKSTOP)
231 .unwrap_or_else(|poison| poison.into_inner());
232 shared.parked.store(false, Ordering::Relaxed);
233 drop(guard);
234 }
235 }
236}
237
238impl<T> Drop for BoundedQueueStream<T> {
239 fn drop(&mut self) {
240 self.shared.set_complete();
242 }
243}
244
245struct SourceQueueShared<T> {
248 state: Mutex<SourceQueueState<T>>,
249 available: Condvar,
250 capacity: usize,
251 strategy: OverflowStrategy,
252}
253
254struct SourceQueueState<T> {
255 buffer: VecDeque<T>,
256 terminal: Option<TerminalSignal>,
257 terminating: bool,
258 pending_count: usize,
259}
260
261impl<T> SourceQueueShared<T> {
262 fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
263 Arc::new(Self {
264 state: Mutex::new(SourceQueueState {
265 buffer: VecDeque::with_capacity(capacity),
266 terminal: None,
267 terminating: false,
268 pending_count: 0,
269 }),
270 available: Condvar::new(),
271 capacity,
272 strategy,
273 })
274 }
275}
276
277pub struct SourceQueue<T> {
281 shared: Arc<SourceQueueShared<T>>,
282 completion: Option<StreamCompletion<NotUsed>>,
283}
284
285impl<T> Clone for SourceQueue<T> {
286 fn clone(&self) -> Self {
287 Self {
288 shared: Arc::clone(&self.shared),
289 completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
290 "cannot clone queue completion handle; use watch_completion() on the original handle"
291 .into(),
292 )))),
293 }
294 }
295}
296
297impl<T: Send + 'static> SourceQueue<T> {
298 pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
301 self.completion.take().unwrap_or_else(|| {
302 StreamCompletion::ready(Err(StreamError::Failed(
303 "queue completion handle already taken".into(),
304 )))
305 })
306 }
307
308 pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
312 let strategy = self.shared.strategy;
313 let capacity = self.shared.capacity;
314 let mut state = self
315 .shared
316 .state
317 .lock()
318 .unwrap_or_else(|poison| poison.into_inner());
319
320 if state.terminal.is_some() {
321 return Ok(QueueOfferResult::QueueClosed);
322 }
323
324 if state.terminating {
325 return Ok(QueueOfferResult::QueueClosed);
326 }
327
328 if state.buffer.len() < capacity {
329 state.buffer.push_back(elem);
330 drop(state);
331 self.shared.available.notify_all();
332 return Ok(QueueOfferResult::Enqueued);
333 }
334
335 match strategy {
336 OverflowStrategy::DropHead => {
337 let _ = state.buffer.pop_front();
338 state.buffer.push_back(elem);
339 drop(state);
340 self.shared.available.notify_all();
341 Ok(QueueOfferResult::Enqueued)
342 }
343 OverflowStrategy::DropTail => {
344 let _ = state.buffer.pop_back();
345 state.buffer.push_back(elem);
346 drop(state);
347 self.shared.available.notify_all();
348 Ok(QueueOfferResult::Enqueued)
349 }
350 OverflowStrategy::DropBuffer => {
351 state.buffer.clear();
352 state.buffer.push_back(elem);
353 drop(state);
354 self.shared.available.notify_all();
355 Ok(QueueOfferResult::Enqueued)
356 }
357 OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
358 OverflowStrategy::Fail => {
359 state.buffer.clear();
360 let error =
361 StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
362 state.terminal = Some(TerminalSignal::Error(error.clone()));
363 drop(state);
364 self.shared.available.notify_all();
365 Ok(QueueOfferResult::Failure(error))
366 }
367 OverflowStrategy::Backpressure => {
368 if state.pending_count >= 1 {
369 return Err(StreamError::Failed(
370 "Too many concurrent offers. Specified maximum is 1. \
371 You have to wait for the previous offer to resolve to send another request"
372 .into(),
373 ));
374 }
375 state.pending_count += 1;
376 loop {
377 if state.terminal.is_some() || state.terminating {
378 state.pending_count -= 1;
379 return Ok(QueueOfferResult::QueueClosed);
380 }
381 if state.buffer.len() < capacity {
382 state.pending_count -= 1;
383 state.buffer.push_back(elem);
384 drop(state);
385 self.shared.available.notify_all();
386 return Ok(QueueOfferResult::Enqueued);
387 }
388 state = self
389 .shared
390 .available
391 .wait(state)
392 .unwrap_or_else(|poison| poison.into_inner());
393 }
394 }
395 }
396 }
397
398 pub fn complete(&self) {
400 let mut state = self
401 .shared
402 .state
403 .lock()
404 .unwrap_or_else(|poison| poison.into_inner());
405 if state.buffer.is_empty() && state.pending_count == 0 {
406 if state.terminal.is_none() {
407 state.terminal = Some(TerminalSignal::Complete);
408 }
409 } else {
410 state.terminating = true;
411 }
412 drop(state);
413 self.shared.available.notify_all();
414 }
415
416 pub fn fail(&self, error: StreamError) {
418 let mut state = self
419 .shared
420 .state
421 .lock()
422 .unwrap_or_else(|poison| poison.into_inner());
423 if state.terminal.is_none() {
424 state.terminal = Some(TerminalSignal::Error(error));
425 }
426 drop(state);
427 self.shared.available.notify_all();
428 }
429}
430
431impl<T> Drop for SourceQueue<T> {
432 fn drop(&mut self) {
433 if Arc::strong_count(&self.shared) != 2 {
434 return;
435 }
436
437 let mut state = self
438 .shared
439 .state
440 .lock()
441 .unwrap_or_else(|poison| poison.into_inner());
442 if state.terminal.is_none() && !state.terminating {
443 state.terminal = Some(TerminalSignal::Complete);
444 }
445 drop(state);
446 self.shared.available.notify_all();
447 }
448}
449
450struct SourceQueueStream<T: Send + 'static> {
451 shared: Arc<SourceQueueShared<T>>,
452 completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
453}
454
455impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
456 type Item = StreamResult<T>;
457
458 fn next(&mut self) -> Option<Self::Item> {
459 let mut state = self
460 .shared
461 .state
462 .lock()
463 .unwrap_or_else(|poison| poison.into_inner());
464 loop {
465 if let Some(TerminalSignal::Error(error)) = &state.terminal {
466 let error = error.clone();
467 drop(state);
468 self.signal_completion(Err(error.clone()));
469 self.shared.available.notify_all();
470 return Some(Err(error));
471 }
472
473 if let Some(item) = state.buffer.pop_front() {
474 drop(state);
475 self.shared.available.notify_all();
476 return Some(Ok(item));
477 }
478
479 if let Some(terminal) = state.terminal.clone() {
480 if state.terminating {
481 state.terminating = false;
482 }
483 drop(state);
484 self.signal_completion(match &terminal {
485 TerminalSignal::Complete => Ok(NotUsed),
486 TerminalSignal::Error(error) => Err(error.clone()),
487 });
488 self.shared.available.notify_all();
489 return match terminal {
490 TerminalSignal::Complete => None,
491 TerminalSignal::Error(error) => Some(Err(error)),
492 };
493 }
494
495 if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
496 state.terminal = Some(TerminalSignal::Complete);
497 state.terminating = false;
498 drop(state);
499 self.signal_completion(Ok(NotUsed));
500 self.shared.available.notify_all();
501 return None;
502 }
503
504 state = self
505 .shared
506 .available
507 .wait(state)
508 .unwrap_or_else(|poison| poison.into_inner());
509 }
510 }
511}
512
513impl<T: Send + 'static> SourceQueueStream<T> {
514 fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
515 if let Some(sender) = self.completion_sender.take() {
516 let _ = sender.send(result);
517 }
518 }
519}
520
521impl<T: Send + 'static> Drop for SourceQueueStream<T> {
522 fn drop(&mut self) {
523 let mut state = self
524 .shared
525 .state
526 .lock()
527 .unwrap_or_else(|poison| poison.into_inner());
528 if state.terminal.is_none() {
529 state.terminal = Some(TerminalSignal::Complete);
530 }
531 state.terminating = false;
532 drop(state);
533 self.signal_completion(Ok(NotUsed));
534 self.shared.available.notify_all();
535 }
536}
537
538struct SinkQueueShared<T> {
541 state: Mutex<SinkQueueState<T>>,
542 available: Condvar,
543}
544
545struct SinkQueueState<T> {
546 buffer: VecDeque<T>,
547 error: Option<StreamError>,
548 completed: bool,
549}
550
551impl<T> SinkQueueShared<T> {
552 fn new() -> Arc<Self> {
553 Arc::new(Self {
554 state: Mutex::new(SinkQueueState {
555 buffer: VecDeque::new(),
556 error: None,
557 completed: false,
558 }),
559 available: Condvar::new(),
560 })
561 }
562}
563
564pub struct SinkQueue<T> {
567 shared: Arc<SinkQueueShared<T>>,
568 _completion: StreamCompletion<NotUsed>,
569}
570
571impl<T> SinkQueue<T> {
572 pub fn pull(&self) -> StreamResult<Option<T>> {
575 let mut state = self
576 .shared
577 .state
578 .lock()
579 .unwrap_or_else(|poison| poison.into_inner());
580 loop {
581 if let Some(item) = state.buffer.pop_front() {
582 return Ok(Some(item));
583 }
584 if let Some(error) = state.error.clone() {
585 return Err(error);
586 }
587 if state.completed {
588 return Ok(None);
589 }
590 state = self
591 .shared
592 .available
593 .wait(state)
594 .unwrap_or_else(|poison| poison.into_inner());
595 }
596 }
597}
598
599impl<T: Send + 'static> Source<T, NotUsed> {
602 #[must_use]
605 pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
606 assert!(capacity > 0, "queue capacity must be greater than zero");
607 Source::from_materialized_factory(move |_materializer| {
608 let shared = BoundedQueueShared::new(capacity);
609 let stream: BoxStream<T> = Box::new(BoundedQueueStream {
610 shared: Arc::clone(&shared),
611 });
612 let handle = BoundedSourceQueue { shared };
613 Ok((stream, handle))
614 })
615 }
616
617 #[must_use]
620 pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
621 assert!(capacity > 0, "queue capacity must be greater than zero");
622 Source::from_materialized_factory(move |_materializer| {
623 let shared = SourceQueueShared::new(capacity, strategy);
624 let (completion_sender, completion_receiver) = oneshot::channel();
625 let stream: BoxStream<T> = Box::new(SourceQueueStream {
626 shared: Arc::clone(&shared),
627 completion_sender: Some(completion_sender),
628 });
629 let handle = SourceQueue {
630 shared,
631 completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
632 };
633 Ok((stream, handle))
634 })
635 }
636}
637
638impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
641 #[must_use]
643 pub fn queue() -> Self {
644 Sink::from_runner(move |mut input, materializer| {
645 let shared = SinkQueueShared::new();
646 let worker_shared = Arc::clone(&shared);
647
648 let completion = materializer.spawn_stream(move |stream_cancelled| {
649 loop {
650 if stream_cancelled.load(Ordering::SeqCst) {
651 return Ok(NotUsed);
652 }
653 match input.next() {
654 Some(Ok(item)) => {
655 let mut state = worker_shared
656 .state
657 .lock()
658 .unwrap_or_else(|poison| poison.into_inner());
659 state.buffer.push_back(item);
660 drop(state);
661 worker_shared.available.notify_all();
662 }
663 Some(Err(error)) => {
664 let mut state = worker_shared
665 .state
666 .lock()
667 .unwrap_or_else(|poison| poison.into_inner());
668 if state.error.is_none() {
669 state.error = Some(error);
670 }
671 drop(state);
672 worker_shared.available.notify_all();
673 return Ok(NotUsed);
674 }
675 None => {
676 let mut state = worker_shared
677 .state
678 .lock()
679 .unwrap_or_else(|poison| poison.into_inner());
680 state.completed = true;
681 drop(state);
682 worker_shared.available.notify_all();
683 return Ok(NotUsed);
684 }
685 }
686 }
687 });
688
689 Ok(SinkQueue {
690 shared: Arc::clone(&shared),
691 _completion: completion,
692 })
693 })
694 }
695}
696
697#[cfg(test)]
700mod tests {
701 use super::*;
702 use crate::stream::Materializer;
703 use std::sync::atomic::AtomicBool;
704 use std::sync::atomic::Ordering;
705 use std::sync::mpsc;
706 use std::thread;
707 use std::time::{Duration, Instant};
708
709 #[test]
712 fn bounded_offer_accepted_vs_processed_distinct() {
713 let (queue, mut stream) = materialize_bounded_queue(2);
714 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
716 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
717 assert_eq!(stream.next(), Some(Ok(1)));
719 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
721 assert_eq!(stream.next(), Some(Ok(2)));
722 assert_eq!(stream.next(), Some(Ok(3)));
723 }
724
725 #[test]
726 fn bounded_offer_dropped_when_full() {
727 let (queue, mut stream) = materialize_bounded_queue(1);
728 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
729 assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
730 assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
731 queue.complete();
732 assert_eq!(stream.next(), Some(Ok(1)));
733 assert_eq!(stream.next(), None);
734 }
735
736 #[test]
737 fn bounded_queue_closed_after_complete() {
738 let (queue, mut stream) = materialize_bounded_queue(2);
739 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
740 queue.complete();
741 assert_eq!(stream.next(), Some(Ok(1)));
743 assert_eq!(stream.next(), None);
744 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
746 }
747
748 #[test]
749 fn bounded_queue_closed_after_fail() {
750 let (queue, mut stream) = materialize_bounded_queue(2);
751 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
752 queue.fail(StreamError::Failed("boom".into()));
753 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
755 assert_eq!(stream.next(), Some(Ok(1)));
757 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
758 }
759
760 #[test]
761 fn bounded_drop_handle_completes_stream() {
762 let queue = {
763 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
764 queue.offer(1);
765 queue.offer(2);
766 assert_eq!(stream.next(), Some(Ok(1)));
767 queue
768 };
769 assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
771 }
772
773 #[test]
774 fn bounded_drop_last_producer_completes_stream() {
775 let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
776 let producer = queue.clone();
777 let consumer = thread::spawn(move || {
778 assert_eq!(stream.next(), None);
779 });
780 drop(producer);
781 drop(queue);
782 consumer.join().unwrap();
783 }
784
785 #[test]
786 fn bounded_drain_before_complete() {
787 let (queue, mut stream) = materialize_bounded_queue(3);
788 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
789 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
790 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
791 assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
792 assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
793 queue.complete();
794 assert_eq!(stream.next(), Some(Ok(1)));
795 assert_eq!(stream.next(), Some(Ok(2)));
796 assert_eq!(stream.next(), Some(Ok(3)));
797 assert_eq!(stream.next(), None);
798 }
799
800 #[test]
801 fn bounded_terminal_completion_is_sticky() {
802 let (queue, mut stream) = materialize_bounded_queue(2);
803 queue.offer(1);
804 queue.complete();
805 assert_eq!(stream.next(), Some(Ok(1)));
806 assert_eq!(stream.next(), None);
807 assert_eq!(stream.next(), None);
808 assert_eq!(stream.next(), None);
809 }
810
811 #[test]
812 fn bounded_terminal_failure_is_sticky() {
813 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
814 queue.fail(StreamError::Failed("boom".into()));
815 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
816 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
817 }
818
819 #[test]
820 fn bounded_producer_consumer_across_threads() {
821 let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
822 let consumer = thread::spawn(move || {
824 let mut collected = Vec::new();
825 while let Some(Ok(item)) = stream.next() {
826 collected.push(item);
827 }
828 collected
829 });
830 let producer = thread::spawn({
831 let queue = queue.clone();
832 move || {
833 for i in 0..10 {
834 assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
835 }
836 queue.complete();
837 }
838 });
839 producer.join().unwrap();
840 let collected = consumer.join().unwrap();
841 assert_eq!(collected, (0..10).collect::<Vec<_>>());
842 }
843
844 #[test]
845 fn bounded_multi_producer_single_consumer() {
846 let n = 50_i32;
847 let producer_count = 4;
848 let total = (producer_count * n) as usize;
849 let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
850
851 let consumer = thread::spawn(move || {
853 let mut collected = Vec::new();
854 while let Some(Ok(item)) = stream.next() {
855 collected.push(item);
856 }
857 collected
858 });
859
860 let mut handles = Vec::new();
861 for p in 0..producer_count {
862 let q = queue.clone();
863 handles.push(thread::spawn(move || {
864 for i in 0..n {
865 assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
866 }
867 }));
868 }
869 for h in handles {
870 h.join().unwrap();
871 }
872 queue.complete();
873 let mut collected = consumer.join().unwrap();
874 collected.sort_unstable();
875 assert_eq!(collected.len(), total);
876 collected.sort_unstable();
877 assert_eq!(collected.len(), (producer_count * n) as usize);
878 }
879
880 #[test]
881 fn bounded_offer_poll_stress_no_lost_wakeup() {
882 const ROUNDS: usize = 40;
888 const PRODUCERS: i64 = 8;
889 const PER_PRODUCER: i64 = 500;
890
891 for _ in 0..ROUNDS {
892 let (queue, stream) = materialize_bounded_queue::<i64>(4);
893
894 let consumer = thread::spawn(move || {
895 let mut count = 0_u64;
896 for item in stream {
897 assert!(item.is_ok(), "unexpected stream failure: {item:?}");
898 count += 1;
899 }
900 count
901 });
902
903 let mut handles = Vec::new();
904 for p in 0..PRODUCERS {
905 let q = queue.clone();
906 handles.push(thread::spawn(move || {
907 let mut enqueued = 0_u64;
908 for i in 0..PER_PRODUCER {
909 if q.offer(p * PER_PRODUCER + i) == QueueOfferResult::Enqueued {
910 enqueued += 1;
911 }
912 }
913 enqueued
914 }));
915 }
916
917 let mut total_enqueued = 0_u64;
918 for h in handles {
919 total_enqueued += h.join().unwrap();
920 }
921 queue.complete();
924 let delivered = consumer.join().unwrap();
925 assert_eq!(
926 delivered, total_enqueued,
927 "every accepted element must be delivered exactly once"
928 );
929 }
930 }
931
932 #[test]
933 fn bounded_pingpong_in_order_stress() {
934 const ROUNDS: usize = 10;
940 const ITEMS: i64 = 1_000;
941
942 for _ in 0..ROUNDS {
943 let (queue, stream) = materialize_bounded_queue::<i64>(1);
944 let consumer = thread::spawn(move || {
945 let mut got = Vec::new();
946 for item in stream {
947 got.push(item.expect("no spurious failure"));
948 }
949 got
950 });
951
952 for i in 0..ITEMS {
953 let deadline = Instant::now() + Duration::from_secs(5);
954 loop {
955 match queue.offer(i) {
956 QueueOfferResult::Enqueued => break,
957 QueueOfferResult::Dropped => {
958 assert!(
959 Instant::now() < deadline,
960 "offer never accepted within deadline — consumer stuck?"
961 );
962 thread::yield_now();
963 }
964 other => panic!("unexpected offer result: {other:?}"),
965 }
966 }
967 }
968 queue.complete();
969 let got = consumer.join().unwrap();
970 assert_eq!(got, (0..ITEMS).collect::<Vec<_>>());
971 }
972 }
973
974 fn materialize_source_queue<T: Send + 'static>(
977 capacity: usize,
978 strategy: OverflowStrategy,
979 ) -> (SourceQueue<T>, BoxStream<T>) {
980 let materializer = Materializer::new();
981 let (stream, queue) = Source::<T>::queue(capacity, strategy)
982 .factory
983 .create(&materializer)
984 .unwrap();
985 (queue, stream)
986 }
987
988 #[test]
989 fn source_queue_offer_enqueued() {
990 let (queue, mut stream) =
991 materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
992 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
993 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
994 assert_eq!(stream.next(), Some(Ok(1)));
995 assert_eq!(stream.next(), Some(Ok(2)));
996 }
997
998 #[test]
999 fn source_queue_drop_head() {
1000 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
1001 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1002 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1003 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1005 queue.complete();
1006 assert_eq!(stream.next(), Some(Ok(2)));
1007 assert_eq!(stream.next(), Some(Ok(3)));
1008 assert_eq!(stream.next(), None);
1009 }
1010
1011 #[test]
1012 fn source_queue_drop_tail() {
1013 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
1014 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1015 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1016 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1017 queue.complete();
1018 assert_eq!(stream.next(), Some(Ok(1)));
1019 assert_eq!(stream.next(), Some(Ok(3)));
1020 assert_eq!(stream.next(), None);
1021 }
1022
1023 #[test]
1024 fn source_queue_drop_buffer() {
1025 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
1026 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1027 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1028 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1029 queue.complete();
1030 assert_eq!(stream.next(), Some(Ok(3)));
1032 assert_eq!(stream.next(), None);
1033 }
1034
1035 #[test]
1036 fn source_queue_drop_new() {
1037 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
1038 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1039 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1040 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
1041 queue.complete();
1042 assert_eq!(stream.next(), Some(Ok(1)));
1043 assert_eq!(stream.next(), Some(Ok(2)));
1044 assert_eq!(stream.next(), None);
1045 }
1046
1047 #[test]
1048 fn source_queue_fail_strategy() {
1049 let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
1050 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1051 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1052 match queue.offer(3).unwrap() {
1053 QueueOfferResult::Failure(e) => {
1054 assert!(format!("{e:?}").contains("Buffer overflow"));
1055 }
1056 other => panic!("expected Failure, got {other:?}"),
1057 }
1058 }
1059
1060 #[test]
1061 fn source_queue_backpressure_blocks() {
1062 let (queue, mut stream) =
1063 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1064 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1065
1066 let consumed = Arc::new(AtomicBool::new(false));
1069 let c = Arc::clone(&consumed);
1070 let (release_tx, release_rx) = mpsc::channel();
1071 let consumer = thread::spawn(move || {
1072 assert_eq!(stream.next(), Some(Ok(1)));
1073 c.store(true, Ordering::SeqCst);
1074 release_rx.recv().unwrap();
1075 });
1076
1077 wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
1078 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1079 release_tx.send(()).unwrap();
1080 consumer.join().unwrap();
1081 assert!(consumed.load(Ordering::SeqCst));
1082 }
1083
1084 #[test]
1085 fn source_queue_concurrent_offer_violation() {
1086 let (queue, mut stream) =
1087 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1088 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1089
1090 let q = queue.clone();
1091 let started = Arc::new(AtomicBool::new(false));
1092 let s = Arc::clone(&started);
1093 let blocker = thread::spawn(move || {
1094 s.store(true, Ordering::SeqCst);
1095 let _ = q.offer(2);
1097 });
1098
1099 while !started.load(Ordering::SeqCst) {
1101 thread::yield_now();
1102 }
1103 wait_until(Duration::from_secs(1), || {
1104 queue
1105 .shared
1106 .state
1107 .lock()
1108 .unwrap_or_else(|poison| poison.into_inner())
1109 .pending_count
1110 == 1
1111 });
1112
1113 let result = queue.offer(3);
1115 assert!(result.is_err());
1116 assert!(format!("{result:?}").contains("Too many concurrent offers"));
1117
1118 assert_eq!(stream.next(), Some(Ok(1)));
1120 blocker.join().unwrap();
1121 }
1122
1123 #[test]
1124 fn source_queue_watch_completion() {
1125 let (queue, mut stream) =
1126 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1127 queue.offer(1).unwrap();
1128 queue.complete();
1129 assert_eq!(stream.next(), Some(Ok(1)));
1130 assert_eq!(stream.next(), None);
1131 assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
1132 }
1133
1134 #[test]
1135 fn source_queue_watch_completion_on_failure() {
1136 let (queue, mut stream) =
1137 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1138 queue.fail(StreamError::Failed("boom".into()));
1139 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
1140 assert_eq!(
1141 queue.watch_completion().wait(),
1142 Err(StreamError::Failed("boom".into()))
1143 );
1144 }
1145
1146 #[test]
1147 fn source_queue_terminal_completion_is_sticky() {
1148 let (queue, mut stream) =
1149 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1150 queue.complete();
1151 assert_eq!(stream.next(), None);
1152 assert_eq!(stream.next(), None);
1153 assert_eq!(stream.next(), None);
1154 }
1155
1156 #[test]
1157 fn source_queue_offer_after_complete_returns_queue_closed() {
1158 let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1159 queue.complete();
1160 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
1161 }
1162
1163 #[test]
1164 fn source_queue_drop_stream_closes_queue() {
1165 let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1166 queue.offer(1).unwrap();
1167 drop(stream);
1168 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
1170 }
1171
1172 #[test]
1173 fn source_queue_drop_last_producer_completes_stream() {
1174 let (queue, mut stream) =
1175 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1176 let producer = queue.clone();
1177 let consumer = thread::spawn(move || {
1178 assert_eq!(stream.next(), None);
1179 });
1180 drop(producer);
1181 drop(queue);
1182 consumer.join().unwrap();
1183 }
1184
1185 #[test]
1186 fn source_queue_backpressure_unblocks_on_complete() {
1187 let (queue, mut stream) =
1188 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1189 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1190
1191 let q = queue.clone();
1192 let blocker = thread::spawn(move || {
1193 q.offer(2)
1195 });
1196
1197 wait_until(Duration::from_secs(1), || {
1198 queue
1199 .shared
1200 .state
1201 .lock()
1202 .unwrap_or_else(|poison| poison.into_inner())
1203 .pending_count
1204 == 1
1205 });
1206 queue.complete();
1208 let result = blocker.join().unwrap();
1209 assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1210
1211 assert_eq!(stream.next(), Some(Ok(1)));
1213 assert_eq!(stream.next(), None);
1214 }
1215
1216 fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1219 source.run_with(Sink::queue()).unwrap()
1220 }
1221
1222 #[test]
1223 fn sink_queue_pull_elements() {
1224 let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1225 assert_eq!(queue.pull().unwrap(), Some(1));
1226 assert_eq!(queue.pull().unwrap(), Some(2));
1227 assert_eq!(queue.pull().unwrap(), Some(3));
1228 assert_eq!(queue.pull().unwrap(), None);
1229 }
1230
1231 #[test]
1232 fn sink_queue_pull_none_after_upstream_completion() {
1233 let queue = materialize_sink_queue(Source::from_iter([42]));
1234 assert_eq!(queue.pull().unwrap(), Some(42));
1235 assert_eq!(queue.pull().unwrap(), None);
1236 assert_eq!(queue.pull().unwrap(), None);
1237 }
1238
1239 #[test]
1240 fn sink_queue_pull_error_from_upstream_failure() {
1241 let queue =
1242 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1243 assert_eq!(
1244 queue.pull().unwrap_err(),
1245 StreamError::Failed("boom".into())
1246 );
1247 assert_eq!(
1248 queue.pull().unwrap_err(),
1249 StreamError::Failed("boom".into())
1250 );
1251 }
1252
1253 #[test]
1254 fn sink_queue_terminal_stickiness() {
1255 let queue = materialize_sink_queue(Source::<i32>::empty());
1256 assert_eq!(queue.pull().unwrap(), None);
1257 assert_eq!(queue.pull().unwrap(), None);
1258 assert_eq!(queue.pull().unwrap(), None);
1259 }
1260
1261 #[test]
1262 fn sink_queue_terminal_failure_stickiness() {
1263 let queue =
1264 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1265 assert_eq!(
1266 queue.pull().unwrap_err(),
1267 StreamError::Failed("boom".into())
1268 );
1269 assert_eq!(
1270 queue.pull().unwrap_err(),
1271 StreamError::Failed("boom".into())
1272 );
1273 }
1274
1275 #[test]
1276 fn sink_queue_drop_cancels_upstream() {
1277 let queue = materialize_sink_queue(Source::repeat(42));
1278 drop(queue);
1280 }
1282
1283 #[test]
1284 fn sink_queue_drain_multiple_items() {
1285 let queue = materialize_sink_queue(Source::from_iter(0..100));
1286 for i in 0..100 {
1287 assert_eq!(queue.pull().unwrap(), Some(i));
1288 }
1289 assert_eq!(queue.pull().unwrap(), None);
1290 }
1291
1292 fn materialize_bounded_queue<T: Send + 'static>(
1295 capacity: usize,
1296 ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1297 let materializer = Materializer::new();
1298 let (stream, queue) = Source::<T>::queue_bounded(capacity)
1299 .factory
1300 .create(&materializer)
1301 .unwrap();
1302 (queue, stream)
1303 }
1304
1305 fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1306 let deadline = Instant::now() + timeout;
1307 while Instant::now() < deadline {
1308 if condition() {
1309 return;
1310 }
1311 thread::yield_now();
1312 thread::sleep(Duration::from_millis(1));
1313 }
1314 assert!(condition(), "condition was not met within {timeout:?}");
1315 }
1316}