1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU8, Ordering, fence};
8use std::sync::{Arc, Condvar, Mutex, OnceLock};
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12
13use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
14use crate::{StreamError, StreamResult};
15use futures::channel::oneshot;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum QueueOfferResult {
19 Enqueued,
20 Dropped,
21 QueueClosed,
22 Failure(StreamError),
23}
24
25#[derive(Clone)]
26enum TerminalSignal {
27 Complete,
28 Error(StreamError),
29}
30
31const TERM_NONE: u8 = 0;
44const TERM_COMPLETE: u8 = 1;
45const TERM_ERROR: u8 = 2;
46
47const PARK_BACKSTOP: Duration = Duration::from_millis(10);
52
53struct BoundedQueueShared<T> {
54 buffer: ArrayQueue<T>,
55 terminal: AtomicU8,
57 error: OnceLock<StreamError>,
59 park: Mutex<()>,
61 available: Condvar,
62 parked: AtomicBool,
64}
65
66impl<T> BoundedQueueShared<T> {
67 fn new(capacity: usize) -> Arc<Self> {
68 Arc::new(Self {
69 buffer: ArrayQueue::new(capacity),
70 terminal: AtomicU8::new(TERM_NONE),
71 error: OnceLock::new(),
72 park: Mutex::new(()),
73 available: Condvar::new(),
74 parked: AtomicBool::new(false),
75 })
76 }
77
78 fn set_complete(&self) {
80 let _ = self.terminal.compare_exchange(
81 TERM_NONE,
82 TERM_COMPLETE,
83 Ordering::AcqRel,
84 Ordering::Relaxed,
85 );
86 self.wake_consumer();
87 }
88
89 fn set_error(&self, error: StreamError) {
91 let _ = self.error.set(error);
94 let _ = self.terminal.compare_exchange(
95 TERM_NONE,
96 TERM_ERROR,
97 Ordering::AcqRel,
98 Ordering::Relaxed,
99 );
100 self.wake_consumer();
101 }
102
103 fn wake_consumer(&self) {
107 fence(Ordering::SeqCst);
112 if self.parked.load(Ordering::Relaxed) {
113 let _guard = self
114 .park
115 .lock()
116 .unwrap_or_else(|poison| poison.into_inner());
117 self.available.notify_one();
118 }
119 }
120}
121
122#[derive(Clone)]
123pub struct BoundedSourceQueue<T> {
124 shared: Arc<BoundedQueueShared<T>>,
125}
126
127impl<T> BoundedSourceQueue<T> {
128 pub fn offer(&self, elem: T) -> QueueOfferResult {
129 if self.shared.terminal.load(Ordering::Acquire) != TERM_NONE {
130 return QueueOfferResult::QueueClosed;
131 }
132 match self.shared.buffer.push(elem) {
133 Ok(()) => {
134 self.shared.wake_consumer();
135 QueueOfferResult::Enqueued
136 }
137 Err(_full) => QueueOfferResult::Dropped,
138 }
139 }
140
141 pub fn complete(&self) {
142 self.shared.set_complete();
143 }
144
145 pub fn fail(&self, error: StreamError) {
146 self.shared.set_error(error);
147 }
148}
149
150impl<T> Drop for BoundedSourceQueue<T> {
151 fn drop(&mut self) {
152 if Arc::strong_count(&self.shared) != 2 {
155 return;
156 }
157 self.shared.set_complete();
158 }
159}
160
161struct BoundedQueueStream<T> {
162 shared: Arc<BoundedQueueShared<T>>,
163}
164
165impl<T> Iterator for BoundedQueueStream<T> {
166 type Item = StreamResult<T>;
167
168 fn next(&mut self) -> Option<Self::Item> {
169 let shared = &*self.shared;
170 loop {
171 if let Some(item) = shared.buffer.pop() {
173 return Some(Ok(item));
174 }
175 match shared.terminal.load(Ordering::Acquire) {
176 TERM_COMPLETE => {
177 if let Some(item) = shared.buffer.pop() {
182 return Some(Ok(item));
183 }
184 return None;
185 }
186 TERM_ERROR => {
187 if let Some(item) = shared.buffer.pop() {
188 return Some(Ok(item));
189 }
190 let error = shared
191 .error
192 .get()
193 .cloned()
194 .unwrap_or_else(|| StreamError::Failed("queue failed".into()));
195 return Some(Err(error));
196 }
197 _ => {}
198 }
199
200 let guard = shared
202 .park
203 .lock()
204 .unwrap_or_else(|poison| poison.into_inner());
205 shared.parked.store(true, Ordering::Relaxed);
206 fence(Ordering::SeqCst);
210 if !shared.buffer.is_empty() || shared.terminal.load(Ordering::Acquire) != TERM_NONE {
211 shared.parked.store(false, Ordering::Relaxed);
212 drop(guard);
213 continue;
214 }
215 let (guard, _timeout) = shared
216 .available
217 .wait_timeout(guard, PARK_BACKSTOP)
218 .unwrap_or_else(|poison| poison.into_inner());
219 shared.parked.store(false, Ordering::Relaxed);
220 drop(guard);
221 }
222 }
223}
224
225impl<T> Drop for BoundedQueueStream<T> {
226 fn drop(&mut self) {
227 self.shared.set_complete();
229 }
230}
231
232struct SourceQueueShared<T> {
235 state: Mutex<SourceQueueState<T>>,
236 available: Condvar,
237 capacity: usize,
238 strategy: OverflowStrategy,
239}
240
241struct SourceQueueState<T> {
242 buffer: VecDeque<T>,
243 terminal: Option<TerminalSignal>,
244 terminating: bool,
245 pending_count: usize,
246}
247
248impl<T> SourceQueueShared<T> {
249 fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
250 Arc::new(Self {
251 state: Mutex::new(SourceQueueState {
252 buffer: VecDeque::with_capacity(capacity),
253 terminal: None,
254 terminating: false,
255 pending_count: 0,
256 }),
257 available: Condvar::new(),
258 capacity,
259 strategy,
260 })
261 }
262}
263
264pub struct SourceQueue<T> {
265 shared: Arc<SourceQueueShared<T>>,
266 completion: Option<StreamCompletion<NotUsed>>,
267}
268
269impl<T> Clone for SourceQueue<T> {
270 fn clone(&self) -> Self {
271 Self {
272 shared: Arc::clone(&self.shared),
273 completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
274 "cannot clone queue completion handle; use watch_completion() on the original handle"
275 .into(),
276 )))),
277 }
278 }
279}
280
281impl<T: Send + 'static> SourceQueue<T> {
282 pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
283 self.completion.take().unwrap_or_else(|| {
284 StreamCompletion::ready(Err(StreamError::Failed(
285 "queue completion handle already taken".into(),
286 )))
287 })
288 }
289
290 pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
291 let strategy = self.shared.strategy;
292 let capacity = self.shared.capacity;
293 let mut state = self
294 .shared
295 .state
296 .lock()
297 .unwrap_or_else(|poison| poison.into_inner());
298
299 if state.terminal.is_some() {
300 return Ok(QueueOfferResult::QueueClosed);
301 }
302
303 if state.terminating {
304 return Ok(QueueOfferResult::QueueClosed);
305 }
306
307 if state.buffer.len() < capacity {
308 state.buffer.push_back(elem);
309 drop(state);
310 self.shared.available.notify_all();
311 return Ok(QueueOfferResult::Enqueued);
312 }
313
314 match strategy {
315 OverflowStrategy::DropHead => {
316 let _ = state.buffer.pop_front();
317 state.buffer.push_back(elem);
318 drop(state);
319 self.shared.available.notify_all();
320 Ok(QueueOfferResult::Enqueued)
321 }
322 OverflowStrategy::DropTail => {
323 let _ = state.buffer.pop_back();
324 state.buffer.push_back(elem);
325 drop(state);
326 self.shared.available.notify_all();
327 Ok(QueueOfferResult::Enqueued)
328 }
329 OverflowStrategy::DropBuffer => {
330 state.buffer.clear();
331 state.buffer.push_back(elem);
332 drop(state);
333 self.shared.available.notify_all();
334 Ok(QueueOfferResult::Enqueued)
335 }
336 OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
337 OverflowStrategy::Fail => {
338 state.buffer.clear();
339 let error =
340 StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
341 state.terminal = Some(TerminalSignal::Error(error.clone()));
342 drop(state);
343 self.shared.available.notify_all();
344 Ok(QueueOfferResult::Failure(error))
345 }
346 OverflowStrategy::Backpressure => {
347 if state.pending_count >= 1 {
348 return Err(StreamError::Failed(
349 "Too many concurrent offers. Specified maximum is 1. \
350 You have to wait for the previous offer to resolve to send another request"
351 .into(),
352 ));
353 }
354 state.pending_count += 1;
355 loop {
356 if state.terminal.is_some() || state.terminating {
357 state.pending_count -= 1;
358 return Ok(QueueOfferResult::QueueClosed);
359 }
360 if state.buffer.len() < capacity {
361 state.pending_count -= 1;
362 state.buffer.push_back(elem);
363 drop(state);
364 self.shared.available.notify_all();
365 return Ok(QueueOfferResult::Enqueued);
366 }
367 state = self
368 .shared
369 .available
370 .wait(state)
371 .unwrap_or_else(|poison| poison.into_inner());
372 }
373 }
374 }
375 }
376
377 pub fn complete(&self) {
378 let mut state = self
379 .shared
380 .state
381 .lock()
382 .unwrap_or_else(|poison| poison.into_inner());
383 if state.buffer.is_empty() && state.pending_count == 0 {
384 if state.terminal.is_none() {
385 state.terminal = Some(TerminalSignal::Complete);
386 }
387 } else {
388 state.terminating = true;
389 }
390 drop(state);
391 self.shared.available.notify_all();
392 }
393
394 pub fn fail(&self, error: StreamError) {
395 let mut state = self
396 .shared
397 .state
398 .lock()
399 .unwrap_or_else(|poison| poison.into_inner());
400 if state.terminal.is_none() {
401 state.terminal = Some(TerminalSignal::Error(error));
402 }
403 drop(state);
404 self.shared.available.notify_all();
405 }
406}
407
408impl<T> Drop for SourceQueue<T> {
409 fn drop(&mut self) {
410 if Arc::strong_count(&self.shared) != 2 {
411 return;
412 }
413
414 let mut state = self
415 .shared
416 .state
417 .lock()
418 .unwrap_or_else(|poison| poison.into_inner());
419 if state.terminal.is_none() && !state.terminating {
420 state.terminal = Some(TerminalSignal::Complete);
421 }
422 drop(state);
423 self.shared.available.notify_all();
424 }
425}
426
427struct SourceQueueStream<T: Send + 'static> {
428 shared: Arc<SourceQueueShared<T>>,
429 completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
430}
431
432impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
433 type Item = StreamResult<T>;
434
435 fn next(&mut self) -> Option<Self::Item> {
436 let mut state = self
437 .shared
438 .state
439 .lock()
440 .unwrap_or_else(|poison| poison.into_inner());
441 loop {
442 if let Some(TerminalSignal::Error(error)) = &state.terminal {
443 let error = error.clone();
444 drop(state);
445 self.signal_completion(Err(error.clone()));
446 self.shared.available.notify_all();
447 return Some(Err(error));
448 }
449
450 if let Some(item) = state.buffer.pop_front() {
451 drop(state);
452 self.shared.available.notify_all();
453 return Some(Ok(item));
454 }
455
456 if let Some(terminal) = state.terminal.clone() {
457 if state.terminating {
458 state.terminating = false;
459 }
460 drop(state);
461 self.signal_completion(match &terminal {
462 TerminalSignal::Complete => Ok(NotUsed),
463 TerminalSignal::Error(error) => Err(error.clone()),
464 });
465 self.shared.available.notify_all();
466 return match terminal {
467 TerminalSignal::Complete => None,
468 TerminalSignal::Error(error) => Some(Err(error)),
469 };
470 }
471
472 if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
473 state.terminal = Some(TerminalSignal::Complete);
474 state.terminating = false;
475 drop(state);
476 self.signal_completion(Ok(NotUsed));
477 self.shared.available.notify_all();
478 return None;
479 }
480
481 state = self
482 .shared
483 .available
484 .wait(state)
485 .unwrap_or_else(|poison| poison.into_inner());
486 }
487 }
488}
489
490impl<T: Send + 'static> SourceQueueStream<T> {
491 fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
492 if let Some(sender) = self.completion_sender.take() {
493 let _ = sender.send(result);
494 }
495 }
496}
497
498impl<T: Send + 'static> Drop for SourceQueueStream<T> {
499 fn drop(&mut self) {
500 let mut state = self
501 .shared
502 .state
503 .lock()
504 .unwrap_or_else(|poison| poison.into_inner());
505 if state.terminal.is_none() {
506 state.terminal = Some(TerminalSignal::Complete);
507 }
508 state.terminating = false;
509 drop(state);
510 self.signal_completion(Ok(NotUsed));
511 self.shared.available.notify_all();
512 }
513}
514
515struct SinkQueueShared<T> {
518 state: Mutex<SinkQueueState<T>>,
519 available: Condvar,
520}
521
522struct SinkQueueState<T> {
523 buffer: VecDeque<T>,
524 error: Option<StreamError>,
525 completed: bool,
526}
527
528impl<T> SinkQueueShared<T> {
529 fn new() -> Arc<Self> {
530 Arc::new(Self {
531 state: Mutex::new(SinkQueueState {
532 buffer: VecDeque::new(),
533 error: None,
534 completed: false,
535 }),
536 available: Condvar::new(),
537 })
538 }
539}
540
541pub struct SinkQueue<T> {
542 shared: Arc<SinkQueueShared<T>>,
543 _completion: StreamCompletion<NotUsed>,
544}
545
546impl<T> SinkQueue<T> {
547 pub fn pull(&self) -> StreamResult<Option<T>> {
548 let mut state = self
549 .shared
550 .state
551 .lock()
552 .unwrap_or_else(|poison| poison.into_inner());
553 loop {
554 if let Some(item) = state.buffer.pop_front() {
555 return Ok(Some(item));
556 }
557 if let Some(error) = state.error.clone() {
558 return Err(error);
559 }
560 if state.completed {
561 return Ok(None);
562 }
563 state = self
564 .shared
565 .available
566 .wait(state)
567 .unwrap_or_else(|poison| poison.into_inner());
568 }
569 }
570}
571
572impl<T: Send + 'static> Source<T, NotUsed> {
575 #[must_use]
576 pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
577 assert!(capacity > 0, "queue capacity must be greater than zero");
578 Source::from_materialized_factory(move |_materializer| {
579 let shared = BoundedQueueShared::new(capacity);
580 let stream: BoxStream<T> = Box::new(BoundedQueueStream {
581 shared: Arc::clone(&shared),
582 });
583 let handle = BoundedSourceQueue { shared };
584 Ok((stream, handle))
585 })
586 }
587
588 #[must_use]
589 pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
590 assert!(capacity > 0, "queue capacity must be greater than zero");
591 Source::from_materialized_factory(move |_materializer| {
592 let shared = SourceQueueShared::new(capacity, strategy);
593 let (completion_sender, completion_receiver) = oneshot::channel();
594 let stream: BoxStream<T> = Box::new(SourceQueueStream {
595 shared: Arc::clone(&shared),
596 completion_sender: Some(completion_sender),
597 });
598 let handle = SourceQueue {
599 shared,
600 completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
601 };
602 Ok((stream, handle))
603 })
604 }
605}
606
607impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
610 #[must_use]
611 pub fn queue() -> Self {
612 Sink::from_runner(move |mut input, materializer| {
613 let shared = SinkQueueShared::new();
614 let worker_shared = Arc::clone(&shared);
615
616 let completion = materializer.spawn_stream(move |stream_cancelled| {
617 loop {
618 if stream_cancelled.load(Ordering::SeqCst) {
619 return Ok(NotUsed);
620 }
621 match input.next() {
622 Some(Ok(item)) => {
623 let mut state = worker_shared
624 .state
625 .lock()
626 .unwrap_or_else(|poison| poison.into_inner());
627 state.buffer.push_back(item);
628 drop(state);
629 worker_shared.available.notify_all();
630 }
631 Some(Err(error)) => {
632 let mut state = worker_shared
633 .state
634 .lock()
635 .unwrap_or_else(|poison| poison.into_inner());
636 if state.error.is_none() {
637 state.error = Some(error);
638 }
639 drop(state);
640 worker_shared.available.notify_all();
641 return Ok(NotUsed);
642 }
643 None => {
644 let mut state = worker_shared
645 .state
646 .lock()
647 .unwrap_or_else(|poison| poison.into_inner());
648 state.completed = true;
649 drop(state);
650 worker_shared.available.notify_all();
651 return Ok(NotUsed);
652 }
653 }
654 }
655 });
656
657 Ok(SinkQueue {
658 shared: Arc::clone(&shared),
659 _completion: completion,
660 })
661 })
662 }
663}
664
665#[cfg(test)]
668mod tests {
669 use super::*;
670 use crate::stream::Materializer;
671 use std::sync::atomic::AtomicBool;
672 use std::sync::atomic::Ordering;
673 use std::sync::mpsc;
674 use std::thread;
675 use std::time::{Duration, Instant};
676
677 #[test]
680 fn bounded_offer_accepted_vs_processed_distinct() {
681 let (queue, mut stream) = materialize_bounded_queue(2);
682 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
684 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
685 assert_eq!(stream.next(), Some(Ok(1)));
687 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
689 assert_eq!(stream.next(), Some(Ok(2)));
690 assert_eq!(stream.next(), Some(Ok(3)));
691 }
692
693 #[test]
694 fn bounded_offer_dropped_when_full() {
695 let (queue, mut stream) = materialize_bounded_queue(1);
696 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
697 assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
698 assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
699 queue.complete();
700 assert_eq!(stream.next(), Some(Ok(1)));
701 assert_eq!(stream.next(), None);
702 }
703
704 #[test]
705 fn bounded_queue_closed_after_complete() {
706 let (queue, mut stream) = materialize_bounded_queue(2);
707 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
708 queue.complete();
709 assert_eq!(stream.next(), Some(Ok(1)));
711 assert_eq!(stream.next(), None);
712 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
714 }
715
716 #[test]
717 fn bounded_queue_closed_after_fail() {
718 let (queue, mut stream) = materialize_bounded_queue(2);
719 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
720 queue.fail(StreamError::Failed("boom".into()));
721 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
723 assert_eq!(stream.next(), Some(Ok(1)));
725 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
726 }
727
728 #[test]
729 fn bounded_drop_handle_completes_stream() {
730 let queue = {
731 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
732 queue.offer(1);
733 queue.offer(2);
734 assert_eq!(stream.next(), Some(Ok(1)));
735 queue
736 };
737 assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
739 }
740
741 #[test]
742 fn bounded_drop_last_producer_completes_stream() {
743 let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
744 let producer = queue.clone();
745 let consumer = thread::spawn(move || {
746 assert_eq!(stream.next(), None);
747 });
748 drop(producer);
749 drop(queue);
750 consumer.join().unwrap();
751 }
752
753 #[test]
754 fn bounded_drain_before_complete() {
755 let (queue, mut stream) = materialize_bounded_queue(3);
756 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
757 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
758 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
759 assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
760 assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
761 queue.complete();
762 assert_eq!(stream.next(), Some(Ok(1)));
763 assert_eq!(stream.next(), Some(Ok(2)));
764 assert_eq!(stream.next(), Some(Ok(3)));
765 assert_eq!(stream.next(), None);
766 }
767
768 #[test]
769 fn bounded_terminal_completion_is_sticky() {
770 let (queue, mut stream) = materialize_bounded_queue(2);
771 queue.offer(1);
772 queue.complete();
773 assert_eq!(stream.next(), Some(Ok(1)));
774 assert_eq!(stream.next(), None);
775 assert_eq!(stream.next(), None);
776 assert_eq!(stream.next(), None);
777 }
778
779 #[test]
780 fn bounded_terminal_failure_is_sticky() {
781 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
782 queue.fail(StreamError::Failed("boom".into()));
783 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
784 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
785 }
786
787 #[test]
788 fn bounded_producer_consumer_across_threads() {
789 let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
790 let consumer = thread::spawn(move || {
792 let mut collected = Vec::new();
793 while let Some(Ok(item)) = stream.next() {
794 collected.push(item);
795 }
796 collected
797 });
798 let producer = thread::spawn({
799 let queue = queue.clone();
800 move || {
801 for i in 0..10 {
802 assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
803 }
804 queue.complete();
805 }
806 });
807 producer.join().unwrap();
808 let collected = consumer.join().unwrap();
809 assert_eq!(collected, (0..10).collect::<Vec<_>>());
810 }
811
812 #[test]
813 fn bounded_multi_producer_single_consumer() {
814 let n = 50_i32;
815 let producer_count = 4;
816 let total = (producer_count * n) as usize;
817 let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
818
819 let consumer = thread::spawn(move || {
821 let mut collected = Vec::new();
822 while let Some(Ok(item)) = stream.next() {
823 collected.push(item);
824 }
825 collected
826 });
827
828 let mut handles = Vec::new();
829 for p in 0..producer_count {
830 let q = queue.clone();
831 handles.push(thread::spawn(move || {
832 for i in 0..n {
833 assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
834 }
835 }));
836 }
837 for h in handles {
838 h.join().unwrap();
839 }
840 queue.complete();
841 let mut collected = consumer.join().unwrap();
842 collected.sort_unstable();
843 assert_eq!(collected.len(), total);
844 collected.sort_unstable();
845 assert_eq!(collected.len(), (producer_count * n) as usize);
846 }
847
848 #[test]
849 fn bounded_offer_poll_stress_no_lost_wakeup() {
850 const ROUNDS: usize = 40;
856 const PRODUCERS: i64 = 8;
857 const PER_PRODUCER: i64 = 500;
858
859 for _ in 0..ROUNDS {
860 let (queue, stream) = materialize_bounded_queue::<i64>(4);
861
862 let consumer = thread::spawn(move || {
863 let mut count = 0_u64;
864 for item in stream {
865 assert!(item.is_ok(), "unexpected stream failure: {item:?}");
866 count += 1;
867 }
868 count
869 });
870
871 let mut handles = Vec::new();
872 for p in 0..PRODUCERS {
873 let q = queue.clone();
874 handles.push(thread::spawn(move || {
875 let mut enqueued = 0_u64;
876 for i in 0..PER_PRODUCER {
877 if q.offer(p * PER_PRODUCER + i) == QueueOfferResult::Enqueued {
878 enqueued += 1;
879 }
880 }
881 enqueued
882 }));
883 }
884
885 let mut total_enqueued = 0_u64;
886 for h in handles {
887 total_enqueued += h.join().unwrap();
888 }
889 queue.complete();
892 let delivered = consumer.join().unwrap();
893 assert_eq!(
894 delivered, total_enqueued,
895 "every accepted element must be delivered exactly once"
896 );
897 }
898 }
899
900 #[test]
901 fn bounded_pingpong_in_order_stress() {
902 const ROUNDS: usize = 10;
908 const ITEMS: i64 = 1_000;
909
910 for _ in 0..ROUNDS {
911 let (queue, stream) = materialize_bounded_queue::<i64>(1);
912 let consumer = thread::spawn(move || {
913 let mut got = Vec::new();
914 for item in stream {
915 got.push(item.expect("no spurious failure"));
916 }
917 got
918 });
919
920 for i in 0..ITEMS {
921 let deadline = Instant::now() + Duration::from_secs(5);
922 loop {
923 match queue.offer(i) {
924 QueueOfferResult::Enqueued => break,
925 QueueOfferResult::Dropped => {
926 assert!(
927 Instant::now() < deadline,
928 "offer never accepted within deadline — consumer stuck?"
929 );
930 thread::yield_now();
931 }
932 other => panic!("unexpected offer result: {other:?}"),
933 }
934 }
935 }
936 queue.complete();
937 let got = consumer.join().unwrap();
938 assert_eq!(got, (0..ITEMS).collect::<Vec<_>>());
939 }
940 }
941
942 fn materialize_source_queue<T: Send + 'static>(
945 capacity: usize,
946 strategy: OverflowStrategy,
947 ) -> (SourceQueue<T>, BoxStream<T>) {
948 let materializer = Materializer::new();
949 let (stream, queue) = Source::<T>::queue(capacity, strategy)
950 .factory
951 .create(&materializer)
952 .unwrap();
953 (queue, stream)
954 }
955
956 #[test]
957 fn source_queue_offer_enqueued() {
958 let (queue, mut stream) =
959 materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
960 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
961 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
962 assert_eq!(stream.next(), Some(Ok(1)));
963 assert_eq!(stream.next(), Some(Ok(2)));
964 }
965
966 #[test]
967 fn source_queue_drop_head() {
968 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
969 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
970 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
971 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
973 queue.complete();
974 assert_eq!(stream.next(), Some(Ok(2)));
975 assert_eq!(stream.next(), Some(Ok(3)));
976 assert_eq!(stream.next(), None);
977 }
978
979 #[test]
980 fn source_queue_drop_tail() {
981 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
982 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
983 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
984 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
985 queue.complete();
986 assert_eq!(stream.next(), Some(Ok(1)));
987 assert_eq!(stream.next(), Some(Ok(3)));
988 assert_eq!(stream.next(), None);
989 }
990
991 #[test]
992 fn source_queue_drop_buffer() {
993 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
994 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
995 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
996 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
997 queue.complete();
998 assert_eq!(stream.next(), Some(Ok(3)));
1000 assert_eq!(stream.next(), None);
1001 }
1002
1003 #[test]
1004 fn source_queue_drop_new() {
1005 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
1006 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1007 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1008 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
1009 queue.complete();
1010 assert_eq!(stream.next(), Some(Ok(1)));
1011 assert_eq!(stream.next(), Some(Ok(2)));
1012 assert_eq!(stream.next(), None);
1013 }
1014
1015 #[test]
1016 fn source_queue_fail_strategy() {
1017 let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
1018 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1019 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1020 match queue.offer(3).unwrap() {
1021 QueueOfferResult::Failure(e) => {
1022 assert!(format!("{e:?}").contains("Buffer overflow"));
1023 }
1024 other => panic!("expected Failure, got {other:?}"),
1025 }
1026 }
1027
1028 #[test]
1029 fn source_queue_backpressure_blocks() {
1030 let (queue, mut stream) =
1031 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1032 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1033
1034 let consumed = Arc::new(AtomicBool::new(false));
1037 let c = Arc::clone(&consumed);
1038 let (release_tx, release_rx) = mpsc::channel();
1039 let consumer = thread::spawn(move || {
1040 assert_eq!(stream.next(), Some(Ok(1)));
1041 c.store(true, Ordering::SeqCst);
1042 release_rx.recv().unwrap();
1043 });
1044
1045 wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
1046 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1047 release_tx.send(()).unwrap();
1048 consumer.join().unwrap();
1049 assert!(consumed.load(Ordering::SeqCst));
1050 }
1051
1052 #[test]
1053 fn source_queue_concurrent_offer_violation() {
1054 let (queue, mut stream) =
1055 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1056 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1057
1058 let q = queue.clone();
1059 let started = Arc::new(AtomicBool::new(false));
1060 let s = Arc::clone(&started);
1061 let blocker = thread::spawn(move || {
1062 s.store(true, Ordering::SeqCst);
1063 let _ = q.offer(2);
1065 });
1066
1067 while !started.load(Ordering::SeqCst) {
1069 thread::yield_now();
1070 }
1071 wait_until(Duration::from_secs(1), || {
1072 queue
1073 .shared
1074 .state
1075 .lock()
1076 .unwrap_or_else(|poison| poison.into_inner())
1077 .pending_count
1078 == 1
1079 });
1080
1081 let result = queue.offer(3);
1083 assert!(result.is_err());
1084 assert!(format!("{result:?}").contains("Too many concurrent offers"));
1085
1086 assert_eq!(stream.next(), Some(Ok(1)));
1088 blocker.join().unwrap();
1089 }
1090
1091 #[test]
1092 fn source_queue_watch_completion() {
1093 let (queue, mut stream) =
1094 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1095 queue.offer(1).unwrap();
1096 queue.complete();
1097 assert_eq!(stream.next(), Some(Ok(1)));
1098 assert_eq!(stream.next(), None);
1099 assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
1100 }
1101
1102 #[test]
1103 fn source_queue_watch_completion_on_failure() {
1104 let (queue, mut stream) =
1105 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1106 queue.fail(StreamError::Failed("boom".into()));
1107 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
1108 assert_eq!(
1109 queue.watch_completion().wait(),
1110 Err(StreamError::Failed("boom".into()))
1111 );
1112 }
1113
1114 #[test]
1115 fn source_queue_terminal_completion_is_sticky() {
1116 let (queue, mut stream) =
1117 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1118 queue.complete();
1119 assert_eq!(stream.next(), None);
1120 assert_eq!(stream.next(), None);
1121 assert_eq!(stream.next(), None);
1122 }
1123
1124 #[test]
1125 fn source_queue_offer_after_complete_returns_queue_closed() {
1126 let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1127 queue.complete();
1128 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
1129 }
1130
1131 #[test]
1132 fn source_queue_drop_stream_closes_queue() {
1133 let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1134 queue.offer(1).unwrap();
1135 drop(stream);
1136 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
1138 }
1139
1140 #[test]
1141 fn source_queue_drop_last_producer_completes_stream() {
1142 let (queue, mut stream) =
1143 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1144 let producer = queue.clone();
1145 let consumer = thread::spawn(move || {
1146 assert_eq!(stream.next(), None);
1147 });
1148 drop(producer);
1149 drop(queue);
1150 consumer.join().unwrap();
1151 }
1152
1153 #[test]
1154 fn source_queue_backpressure_unblocks_on_complete() {
1155 let (queue, mut stream) =
1156 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1157 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1158
1159 let q = queue.clone();
1160 let blocker = thread::spawn(move || {
1161 q.offer(2)
1163 });
1164
1165 wait_until(Duration::from_secs(1), || {
1166 queue
1167 .shared
1168 .state
1169 .lock()
1170 .unwrap_or_else(|poison| poison.into_inner())
1171 .pending_count
1172 == 1
1173 });
1174 queue.complete();
1176 let result = blocker.join().unwrap();
1177 assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1178
1179 assert_eq!(stream.next(), Some(Ok(1)));
1181 assert_eq!(stream.next(), None);
1182 }
1183
1184 fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1187 source.run_with(Sink::queue()).unwrap()
1188 }
1189
1190 #[test]
1191 fn sink_queue_pull_elements() {
1192 let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1193 assert_eq!(queue.pull().unwrap(), Some(1));
1194 assert_eq!(queue.pull().unwrap(), Some(2));
1195 assert_eq!(queue.pull().unwrap(), Some(3));
1196 assert_eq!(queue.pull().unwrap(), None);
1197 }
1198
1199 #[test]
1200 fn sink_queue_pull_none_after_upstream_completion() {
1201 let queue = materialize_sink_queue(Source::from_iter([42]));
1202 assert_eq!(queue.pull().unwrap(), Some(42));
1203 assert_eq!(queue.pull().unwrap(), None);
1204 assert_eq!(queue.pull().unwrap(), None);
1205 }
1206
1207 #[test]
1208 fn sink_queue_pull_error_from_upstream_failure() {
1209 let queue =
1210 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1211 assert_eq!(
1212 queue.pull().unwrap_err(),
1213 StreamError::Failed("boom".into())
1214 );
1215 assert_eq!(
1216 queue.pull().unwrap_err(),
1217 StreamError::Failed("boom".into())
1218 );
1219 }
1220
1221 #[test]
1222 fn sink_queue_terminal_stickiness() {
1223 let queue = materialize_sink_queue(Source::<i32>::empty());
1224 assert_eq!(queue.pull().unwrap(), None);
1225 assert_eq!(queue.pull().unwrap(), None);
1226 assert_eq!(queue.pull().unwrap(), None);
1227 }
1228
1229 #[test]
1230 fn sink_queue_terminal_failure_stickiness() {
1231 let queue =
1232 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1233 assert_eq!(
1234 queue.pull().unwrap_err(),
1235 StreamError::Failed("boom".into())
1236 );
1237 assert_eq!(
1238 queue.pull().unwrap_err(),
1239 StreamError::Failed("boom".into())
1240 );
1241 }
1242
1243 #[test]
1244 fn sink_queue_drop_cancels_upstream() {
1245 let queue = materialize_sink_queue(Source::repeat(42));
1246 drop(queue);
1248 }
1250
1251 #[test]
1252 fn sink_queue_drain_multiple_items() {
1253 let queue = materialize_sink_queue(Source::from_iter(0..100));
1254 for i in 0..100 {
1255 assert_eq!(queue.pull().unwrap(), Some(i));
1256 }
1257 assert_eq!(queue.pull().unwrap(), None);
1258 }
1259
1260 fn materialize_bounded_queue<T: Send + 'static>(
1263 capacity: usize,
1264 ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1265 let materializer = Materializer::new();
1266 let (stream, queue) = Source::<T>::queue_bounded(capacity)
1267 .factory
1268 .create(&materializer)
1269 .unwrap();
1270 (queue, stream)
1271 }
1272
1273 fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1274 let deadline = Instant::now() + timeout;
1275 while Instant::now() < deadline {
1276 if condition() {
1277 return;
1278 }
1279 thread::yield_now();
1280 thread::sleep(Duration::from_millis(1));
1281 }
1282 assert!(condition(), "condition was not met within {timeout:?}");
1283 }
1284}