1use std::collections::VecDeque;
7use std::sync::atomic::Ordering;
8use std::sync::{Arc, Condvar, Mutex};
9
10use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
11use crate::{StreamError, StreamResult};
12use futures::channel::oneshot;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum QueueOfferResult {
16 Enqueued,
17 Dropped,
18 QueueClosed,
19 Failure(StreamError),
20}
21
22#[derive(Clone)]
23enum TerminalSignal {
24 Complete,
25 Error(StreamError),
26}
27
28struct BoundedQueueShared<T> {
31 state: Mutex<BoundedQueueState<T>>,
32 available: Condvar,
33 capacity: usize,
34}
35
36struct BoundedQueueState<T> {
37 buffer: VecDeque<T>,
38 terminal: Option<TerminalSignal>,
39}
40
41impl<T> BoundedQueueShared<T> {
42 fn new(capacity: usize) -> Arc<Self> {
43 Arc::new(Self {
44 state: Mutex::new(BoundedQueueState {
45 buffer: VecDeque::with_capacity(capacity),
46 terminal: None,
47 }),
48 available: Condvar::new(),
49 capacity,
50 })
51 }
52}
53
54#[derive(Clone)]
55pub struct BoundedSourceQueue<T> {
56 shared: Arc<BoundedQueueShared<T>>,
57}
58
59impl<T> BoundedSourceQueue<T> {
60 pub fn offer(&self, elem: T) -> QueueOfferResult {
61 let mut state = self
62 .shared
63 .state
64 .lock()
65 .unwrap_or_else(|poison| poison.into_inner());
66
67 match &state.terminal {
68 Some(TerminalSignal::Complete) | Some(TerminalSignal::Error(_)) => {
69 return QueueOfferResult::QueueClosed;
70 }
71 None => {}
72 }
73
74 if state.buffer.len() < self.shared.capacity {
75 state.buffer.push_back(elem);
76 self.shared.available.notify_all();
77 QueueOfferResult::Enqueued
78 } else {
79 QueueOfferResult::Dropped
80 }
81 }
82
83 pub fn complete(&self) {
84 let mut state = self
85 .shared
86 .state
87 .lock()
88 .unwrap_or_else(|poison| poison.into_inner());
89 if state.terminal.is_none() {
90 state.terminal = Some(TerminalSignal::Complete);
91 }
92 drop(state);
93 self.shared.available.notify_all();
94 }
95
96 pub fn fail(&self, error: StreamError) {
97 let mut state = self
98 .shared
99 .state
100 .lock()
101 .unwrap_or_else(|poison| poison.into_inner());
102 if state.terminal.is_none() {
103 state.terminal = Some(TerminalSignal::Error(error));
104 }
105 drop(state);
106 self.shared.available.notify_all();
107 }
108}
109
110impl<T> Drop for BoundedSourceQueue<T> {
111 fn drop(&mut self) {
112 if Arc::strong_count(&self.shared) != 2 {
113 return;
114 }
115
116 let mut state = self
117 .shared
118 .state
119 .lock()
120 .unwrap_or_else(|poison| poison.into_inner());
121 if state.terminal.is_none() {
122 state.terminal = Some(TerminalSignal::Complete);
123 }
124 drop(state);
125 self.shared.available.notify_all();
126 }
127}
128
129struct BoundedQueueStream<T> {
130 shared: Arc<BoundedQueueShared<T>>,
131}
132
133impl<T> Iterator for BoundedQueueStream<T> {
134 type Item = StreamResult<T>;
135
136 fn next(&mut self) -> Option<Self::Item> {
137 let mut state = self
138 .shared
139 .state
140 .lock()
141 .unwrap_or_else(|poison| poison.into_inner());
142 loop {
143 if let Some(item) = state.buffer.pop_front() {
144 self.shared.available.notify_all();
145 return Some(Ok(item));
146 }
147 if let Some(terminal) = state.terminal.clone() {
148 return match terminal {
149 TerminalSignal::Complete => None,
150 TerminalSignal::Error(error) => Some(Err(error)),
151 };
152 }
153 state = self
154 .shared
155 .available
156 .wait(state)
157 .unwrap_or_else(|poison| poison.into_inner());
158 }
159 }
160}
161
162impl<T> Drop for BoundedQueueStream<T> {
163 fn drop(&mut self) {
164 let mut state = self
165 .shared
166 .state
167 .lock()
168 .unwrap_or_else(|poison| poison.into_inner());
169 if state.terminal.is_none() {
170 state.terminal = Some(TerminalSignal::Complete);
171 }
172 self.shared.available.notify_all();
173 }
174}
175
176struct SourceQueueShared<T> {
179 state: Mutex<SourceQueueState<T>>,
180 available: Condvar,
181 capacity: usize,
182 strategy: OverflowStrategy,
183}
184
185struct SourceQueueState<T> {
186 buffer: VecDeque<T>,
187 terminal: Option<TerminalSignal>,
188 terminating: bool,
189 pending_count: usize,
190}
191
192impl<T> SourceQueueShared<T> {
193 fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
194 Arc::new(Self {
195 state: Mutex::new(SourceQueueState {
196 buffer: VecDeque::with_capacity(capacity),
197 terminal: None,
198 terminating: false,
199 pending_count: 0,
200 }),
201 available: Condvar::new(),
202 capacity,
203 strategy,
204 })
205 }
206}
207
208pub struct SourceQueue<T> {
209 shared: Arc<SourceQueueShared<T>>,
210 completion: Option<StreamCompletion<NotUsed>>,
211}
212
213impl<T> Clone for SourceQueue<T> {
214 fn clone(&self) -> Self {
215 Self {
216 shared: Arc::clone(&self.shared),
217 completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
218 "cannot clone queue completion handle; use watch_completion() on the original handle"
219 .into(),
220 )))),
221 }
222 }
223}
224
225impl<T: Send + 'static> SourceQueue<T> {
226 pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
227 self.completion.take().unwrap_or_else(|| {
228 StreamCompletion::ready(Err(StreamError::Failed(
229 "queue completion handle already taken".into(),
230 )))
231 })
232 }
233
234 pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
235 let strategy = self.shared.strategy;
236 let capacity = self.shared.capacity;
237 let mut state = self
238 .shared
239 .state
240 .lock()
241 .unwrap_or_else(|poison| poison.into_inner());
242
243 if state.terminal.is_some() {
244 return Ok(QueueOfferResult::QueueClosed);
245 }
246
247 if state.terminating {
248 return Ok(QueueOfferResult::QueueClosed);
249 }
250
251 if state.buffer.len() < capacity {
252 state.buffer.push_back(elem);
253 drop(state);
254 self.shared.available.notify_all();
255 return Ok(QueueOfferResult::Enqueued);
256 }
257
258 match strategy {
259 OverflowStrategy::DropHead => {
260 let _ = state.buffer.pop_front();
261 state.buffer.push_back(elem);
262 drop(state);
263 self.shared.available.notify_all();
264 Ok(QueueOfferResult::Enqueued)
265 }
266 OverflowStrategy::DropTail => {
267 let _ = state.buffer.pop_back();
268 state.buffer.push_back(elem);
269 drop(state);
270 self.shared.available.notify_all();
271 Ok(QueueOfferResult::Enqueued)
272 }
273 OverflowStrategy::DropBuffer => {
274 state.buffer.clear();
275 state.buffer.push_back(elem);
276 drop(state);
277 self.shared.available.notify_all();
278 Ok(QueueOfferResult::Enqueued)
279 }
280 OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
281 OverflowStrategy::Fail => {
282 state.buffer.clear();
283 let error =
284 StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
285 state.terminal = Some(TerminalSignal::Error(error.clone()));
286 drop(state);
287 self.shared.available.notify_all();
288 Ok(QueueOfferResult::Failure(error))
289 }
290 OverflowStrategy::Backpressure => {
291 if state.pending_count >= 1 {
292 return Err(StreamError::Failed(
293 "Too many concurrent offers. Specified maximum is 1. \
294 You have to wait for the previous offer to resolve to send another request"
295 .into(),
296 ));
297 }
298 state.pending_count += 1;
299 loop {
300 if state.terminal.is_some() || state.terminating {
301 state.pending_count -= 1;
302 return Ok(QueueOfferResult::QueueClosed);
303 }
304 if state.buffer.len() < capacity {
305 state.pending_count -= 1;
306 state.buffer.push_back(elem);
307 drop(state);
308 self.shared.available.notify_all();
309 return Ok(QueueOfferResult::Enqueued);
310 }
311 state = self
312 .shared
313 .available
314 .wait(state)
315 .unwrap_or_else(|poison| poison.into_inner());
316 }
317 }
318 }
319 }
320
321 pub fn complete(&self) {
322 let mut state = self
323 .shared
324 .state
325 .lock()
326 .unwrap_or_else(|poison| poison.into_inner());
327 if state.buffer.is_empty() && state.pending_count == 0 {
328 if state.terminal.is_none() {
329 state.terminal = Some(TerminalSignal::Complete);
330 }
331 } else {
332 state.terminating = true;
333 }
334 drop(state);
335 self.shared.available.notify_all();
336 }
337
338 pub fn fail(&self, error: StreamError) {
339 let mut state = self
340 .shared
341 .state
342 .lock()
343 .unwrap_or_else(|poison| poison.into_inner());
344 if state.terminal.is_none() {
345 state.terminal = Some(TerminalSignal::Error(error));
346 }
347 drop(state);
348 self.shared.available.notify_all();
349 }
350}
351
352impl<T> Drop for SourceQueue<T> {
353 fn drop(&mut self) {
354 if Arc::strong_count(&self.shared) != 2 {
355 return;
356 }
357
358 let mut state = self
359 .shared
360 .state
361 .lock()
362 .unwrap_or_else(|poison| poison.into_inner());
363 if state.terminal.is_none() && !state.terminating {
364 state.terminal = Some(TerminalSignal::Complete);
365 }
366 drop(state);
367 self.shared.available.notify_all();
368 }
369}
370
371struct SourceQueueStream<T: Send + 'static> {
372 shared: Arc<SourceQueueShared<T>>,
373 completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
374}
375
376impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
377 type Item = StreamResult<T>;
378
379 fn next(&mut self) -> Option<Self::Item> {
380 let mut state = self
381 .shared
382 .state
383 .lock()
384 .unwrap_or_else(|poison| poison.into_inner());
385 loop {
386 if let Some(TerminalSignal::Error(error)) = &state.terminal {
387 let error = error.clone();
388 drop(state);
389 self.signal_completion(Err(error.clone()));
390 self.shared.available.notify_all();
391 return Some(Err(error));
392 }
393
394 if let Some(item) = state.buffer.pop_front() {
395 drop(state);
396 self.shared.available.notify_all();
397 return Some(Ok(item));
398 }
399
400 if let Some(terminal) = state.terminal.clone() {
401 if state.terminating {
402 state.terminating = false;
403 }
404 drop(state);
405 self.signal_completion(match &terminal {
406 TerminalSignal::Complete => Ok(NotUsed),
407 TerminalSignal::Error(error) => Err(error.clone()),
408 });
409 self.shared.available.notify_all();
410 return match terminal {
411 TerminalSignal::Complete => None,
412 TerminalSignal::Error(error) => Some(Err(error)),
413 };
414 }
415
416 if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
417 state.terminal = Some(TerminalSignal::Complete);
418 state.terminating = false;
419 drop(state);
420 self.signal_completion(Ok(NotUsed));
421 self.shared.available.notify_all();
422 return None;
423 }
424
425 state = self
426 .shared
427 .available
428 .wait(state)
429 .unwrap_or_else(|poison| poison.into_inner());
430 }
431 }
432}
433
434impl<T: Send + 'static> SourceQueueStream<T> {
435 fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
436 if let Some(sender) = self.completion_sender.take() {
437 let _ = sender.send(result);
438 }
439 }
440}
441
442impl<T: Send + 'static> Drop for SourceQueueStream<T> {
443 fn drop(&mut self) {
444 let mut state = self
445 .shared
446 .state
447 .lock()
448 .unwrap_or_else(|poison| poison.into_inner());
449 if state.terminal.is_none() {
450 state.terminal = Some(TerminalSignal::Complete);
451 }
452 state.terminating = false;
453 drop(state);
454 self.signal_completion(Ok(NotUsed));
455 self.shared.available.notify_all();
456 }
457}
458
459struct SinkQueueShared<T> {
462 state: Mutex<SinkQueueState<T>>,
463 available: Condvar,
464}
465
466struct SinkQueueState<T> {
467 buffer: VecDeque<T>,
468 error: Option<StreamError>,
469 completed: bool,
470}
471
472impl<T> SinkQueueShared<T> {
473 fn new() -> Arc<Self> {
474 Arc::new(Self {
475 state: Mutex::new(SinkQueueState {
476 buffer: VecDeque::new(),
477 error: None,
478 completed: false,
479 }),
480 available: Condvar::new(),
481 })
482 }
483}
484
485pub struct SinkQueue<T> {
486 shared: Arc<SinkQueueShared<T>>,
487 _completion: StreamCompletion<NotUsed>,
488}
489
490impl<T> SinkQueue<T> {
491 pub fn pull(&self) -> StreamResult<Option<T>> {
492 let mut state = self
493 .shared
494 .state
495 .lock()
496 .unwrap_or_else(|poison| poison.into_inner());
497 loop {
498 if let Some(item) = state.buffer.pop_front() {
499 return Ok(Some(item));
500 }
501 if let Some(error) = state.error.clone() {
502 return Err(error);
503 }
504 if state.completed {
505 return Ok(None);
506 }
507 state = self
508 .shared
509 .available
510 .wait(state)
511 .unwrap_or_else(|poison| poison.into_inner());
512 }
513 }
514}
515
516impl<T: Send + 'static> Source<T, NotUsed> {
519 #[must_use]
520 pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
521 assert!(capacity > 0, "queue capacity must be greater than zero");
522 Source::from_materialized_factory(move |_materializer| {
523 let shared = BoundedQueueShared::new(capacity);
524 let stream: BoxStream<T> = Box::new(BoundedQueueStream {
525 shared: Arc::clone(&shared),
526 });
527 let handle = BoundedSourceQueue { shared };
528 Ok((stream, handle))
529 })
530 }
531
532 #[must_use]
533 pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
534 assert!(capacity > 0, "queue capacity must be greater than zero");
535 Source::from_materialized_factory(move |_materializer| {
536 let shared = SourceQueueShared::new(capacity, strategy);
537 let (completion_sender, completion_receiver) = oneshot::channel();
538 let stream: BoxStream<T> = Box::new(SourceQueueStream {
539 shared: Arc::clone(&shared),
540 completion_sender: Some(completion_sender),
541 });
542 let handle = SourceQueue {
543 shared,
544 completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
545 };
546 Ok((stream, handle))
547 })
548 }
549}
550
551impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
554 #[must_use]
555 pub fn queue() -> Self {
556 Sink::from_runner(move |mut input, materializer| {
557 let shared = SinkQueueShared::new();
558 let worker_shared = Arc::clone(&shared);
559
560 let completion = materializer.spawn_stream(move |stream_cancelled| {
561 loop {
562 if stream_cancelled.load(Ordering::SeqCst) {
563 return Ok(NotUsed);
564 }
565 match input.next() {
566 Some(Ok(item)) => {
567 let mut state = worker_shared
568 .state
569 .lock()
570 .unwrap_or_else(|poison| poison.into_inner());
571 state.buffer.push_back(item);
572 drop(state);
573 worker_shared.available.notify_all();
574 }
575 Some(Err(error)) => {
576 let mut state = worker_shared
577 .state
578 .lock()
579 .unwrap_or_else(|poison| poison.into_inner());
580 if state.error.is_none() {
581 state.error = Some(error);
582 }
583 drop(state);
584 worker_shared.available.notify_all();
585 return Ok(NotUsed);
586 }
587 None => {
588 let mut state = worker_shared
589 .state
590 .lock()
591 .unwrap_or_else(|poison| poison.into_inner());
592 state.completed = true;
593 drop(state);
594 worker_shared.available.notify_all();
595 return Ok(NotUsed);
596 }
597 }
598 }
599 });
600
601 Ok(SinkQueue {
602 shared: Arc::clone(&shared),
603 _completion: completion,
604 })
605 })
606 }
607}
608
609#[cfg(test)]
612mod tests {
613 use super::*;
614 use crate::stream::Materializer;
615 use std::sync::atomic::AtomicBool;
616 use std::sync::atomic::Ordering;
617 use std::sync::mpsc;
618 use std::thread;
619 use std::time::{Duration, Instant};
620
621 #[test]
624 fn bounded_offer_accepted_vs_processed_distinct() {
625 let (queue, mut stream) = materialize_bounded_queue(2);
626 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
628 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
629 assert_eq!(stream.next(), Some(Ok(1)));
631 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
633 assert_eq!(stream.next(), Some(Ok(2)));
634 assert_eq!(stream.next(), Some(Ok(3)));
635 }
636
637 #[test]
638 fn bounded_offer_dropped_when_full() {
639 let (queue, mut stream) = materialize_bounded_queue(1);
640 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
641 assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
642 assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
643 queue.complete();
644 assert_eq!(stream.next(), Some(Ok(1)));
645 assert_eq!(stream.next(), None);
646 }
647
648 #[test]
649 fn bounded_queue_closed_after_complete() {
650 let (queue, mut stream) = materialize_bounded_queue(2);
651 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
652 queue.complete();
653 assert_eq!(stream.next(), Some(Ok(1)));
655 assert_eq!(stream.next(), None);
656 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
658 }
659
660 #[test]
661 fn bounded_queue_closed_after_fail() {
662 let (queue, mut stream) = materialize_bounded_queue(2);
663 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
664 queue.fail(StreamError::Failed("boom".into()));
665 assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
667 assert_eq!(stream.next(), Some(Ok(1)));
669 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
670 }
671
672 #[test]
673 fn bounded_drop_handle_completes_stream() {
674 let queue = {
675 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
676 queue.offer(1);
677 queue.offer(2);
678 assert_eq!(stream.next(), Some(Ok(1)));
679 queue
680 };
681 assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
683 }
684
685 #[test]
686 fn bounded_drop_last_producer_completes_stream() {
687 let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
688 let producer = queue.clone();
689 let consumer = thread::spawn(move || {
690 assert_eq!(stream.next(), None);
691 });
692 drop(producer);
693 drop(queue);
694 consumer.join().unwrap();
695 }
696
697 #[test]
698 fn bounded_drain_before_complete() {
699 let (queue, mut stream) = materialize_bounded_queue(3);
700 assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
701 assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
702 assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
703 assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
704 assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
705 queue.complete();
706 assert_eq!(stream.next(), Some(Ok(1)));
707 assert_eq!(stream.next(), Some(Ok(2)));
708 assert_eq!(stream.next(), Some(Ok(3)));
709 assert_eq!(stream.next(), None);
710 }
711
712 #[test]
713 fn bounded_terminal_completion_is_sticky() {
714 let (queue, mut stream) = materialize_bounded_queue(2);
715 queue.offer(1);
716 queue.complete();
717 assert_eq!(stream.next(), Some(Ok(1)));
718 assert_eq!(stream.next(), None);
719 assert_eq!(stream.next(), None);
720 assert_eq!(stream.next(), None);
721 }
722
723 #[test]
724 fn bounded_terminal_failure_is_sticky() {
725 let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
726 queue.fail(StreamError::Failed("boom".into()));
727 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
728 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
729 }
730
731 #[test]
732 fn bounded_producer_consumer_across_threads() {
733 let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
734 let consumer = thread::spawn(move || {
736 let mut collected = Vec::new();
737 while let Some(Ok(item)) = stream.next() {
738 collected.push(item);
739 }
740 collected
741 });
742 let producer = thread::spawn({
743 let queue = queue.clone();
744 move || {
745 for i in 0..10 {
746 assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
747 }
748 queue.complete();
749 }
750 });
751 producer.join().unwrap();
752 let collected = consumer.join().unwrap();
753 assert_eq!(collected, (0..10).collect::<Vec<_>>());
754 }
755
756 #[test]
757 fn bounded_multi_producer_single_consumer() {
758 let n = 50_i32;
759 let producer_count = 4;
760 let total = (producer_count * n) as usize;
761 let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
762
763 let consumer = thread::spawn(move || {
765 let mut collected = Vec::new();
766 while let Some(Ok(item)) = stream.next() {
767 collected.push(item);
768 }
769 collected
770 });
771
772 let mut handles = Vec::new();
773 for p in 0..producer_count {
774 let q = queue.clone();
775 handles.push(thread::spawn(move || {
776 for i in 0..n {
777 assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
778 }
779 }));
780 }
781 for h in handles {
782 h.join().unwrap();
783 }
784 queue.complete();
785 let mut collected = consumer.join().unwrap();
786 collected.sort_unstable();
787 assert_eq!(collected.len(), total);
788 collected.sort_unstable();
789 assert_eq!(collected.len(), (producer_count * n) as usize);
790 }
791
792 fn materialize_source_queue<T: Send + 'static>(
795 capacity: usize,
796 strategy: OverflowStrategy,
797 ) -> (SourceQueue<T>, BoxStream<T>) {
798 let materializer = Materializer::new();
799 let (stream, queue) = Source::<T>::queue(capacity, strategy)
800 .factory
801 .create(&materializer)
802 .unwrap();
803 (queue, stream)
804 }
805
806 #[test]
807 fn source_queue_offer_enqueued() {
808 let (queue, mut stream) =
809 materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
810 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
811 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
812 assert_eq!(stream.next(), Some(Ok(1)));
813 assert_eq!(stream.next(), Some(Ok(2)));
814 }
815
816 #[test]
817 fn source_queue_drop_head() {
818 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
819 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
820 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
821 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
823 queue.complete();
824 assert_eq!(stream.next(), Some(Ok(2)));
825 assert_eq!(stream.next(), Some(Ok(3)));
826 assert_eq!(stream.next(), None);
827 }
828
829 #[test]
830 fn source_queue_drop_tail() {
831 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
832 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
833 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
834 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
835 queue.complete();
836 assert_eq!(stream.next(), Some(Ok(1)));
837 assert_eq!(stream.next(), Some(Ok(3)));
838 assert_eq!(stream.next(), None);
839 }
840
841 #[test]
842 fn source_queue_drop_buffer() {
843 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
844 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
845 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
846 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
847 queue.complete();
848 assert_eq!(stream.next(), Some(Ok(3)));
850 assert_eq!(stream.next(), None);
851 }
852
853 #[test]
854 fn source_queue_drop_new() {
855 let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
856 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
857 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
858 assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
859 queue.complete();
860 assert_eq!(stream.next(), Some(Ok(1)));
861 assert_eq!(stream.next(), Some(Ok(2)));
862 assert_eq!(stream.next(), None);
863 }
864
865 #[test]
866 fn source_queue_fail_strategy() {
867 let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
868 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
869 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
870 match queue.offer(3).unwrap() {
871 QueueOfferResult::Failure(e) => {
872 assert!(format!("{e:?}").contains("Buffer overflow"));
873 }
874 other => panic!("expected Failure, got {other:?}"),
875 }
876 }
877
878 #[test]
879 fn source_queue_backpressure_blocks() {
880 let (queue, mut stream) =
881 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
882 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
883
884 let consumed = Arc::new(AtomicBool::new(false));
887 let c = Arc::clone(&consumed);
888 let (release_tx, release_rx) = mpsc::channel();
889 let consumer = thread::spawn(move || {
890 assert_eq!(stream.next(), Some(Ok(1)));
891 c.store(true, Ordering::SeqCst);
892 release_rx.recv().unwrap();
893 });
894
895 wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
896 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
897 release_tx.send(()).unwrap();
898 consumer.join().unwrap();
899 assert!(consumed.load(Ordering::SeqCst));
900 }
901
902 #[test]
903 fn source_queue_concurrent_offer_violation() {
904 let (queue, mut stream) =
905 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
906 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
907
908 let q = queue.clone();
909 let started = Arc::new(AtomicBool::new(false));
910 let s = Arc::clone(&started);
911 let blocker = thread::spawn(move || {
912 s.store(true, Ordering::SeqCst);
913 let _ = q.offer(2);
915 });
916
917 while !started.load(Ordering::SeqCst) {
919 thread::yield_now();
920 }
921 wait_until(Duration::from_secs(1), || {
922 queue
923 .shared
924 .state
925 .lock()
926 .unwrap_or_else(|poison| poison.into_inner())
927 .pending_count
928 == 1
929 });
930
931 let result = queue.offer(3);
933 assert!(result.is_err());
934 assert!(format!("{result:?}").contains("Too many concurrent offers"));
935
936 assert_eq!(stream.next(), Some(Ok(1)));
938 blocker.join().unwrap();
939 }
940
941 #[test]
942 fn source_queue_watch_completion() {
943 let (queue, mut stream) =
944 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
945 queue.offer(1).unwrap();
946 queue.complete();
947 assert_eq!(stream.next(), Some(Ok(1)));
948 assert_eq!(stream.next(), None);
949 assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
950 }
951
952 #[test]
953 fn source_queue_watch_completion_on_failure() {
954 let (queue, mut stream) =
955 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
956 queue.fail(StreamError::Failed("boom".into()));
957 assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
958 assert_eq!(
959 queue.watch_completion().wait(),
960 Err(StreamError::Failed("boom".into()))
961 );
962 }
963
964 #[test]
965 fn source_queue_terminal_completion_is_sticky() {
966 let (queue, mut stream) =
967 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
968 queue.complete();
969 assert_eq!(stream.next(), None);
970 assert_eq!(stream.next(), None);
971 assert_eq!(stream.next(), None);
972 }
973
974 #[test]
975 fn source_queue_offer_after_complete_returns_queue_closed() {
976 let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
977 queue.complete();
978 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
979 }
980
981 #[test]
982 fn source_queue_drop_stream_closes_queue() {
983 let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
984 queue.offer(1).unwrap();
985 drop(stream);
986 assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
988 }
989
990 #[test]
991 fn source_queue_drop_last_producer_completes_stream() {
992 let (queue, mut stream) =
993 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
994 let producer = queue.clone();
995 let consumer = thread::spawn(move || {
996 assert_eq!(stream.next(), None);
997 });
998 drop(producer);
999 drop(queue);
1000 consumer.join().unwrap();
1001 }
1002
1003 #[test]
1004 fn source_queue_backpressure_unblocks_on_complete() {
1005 let (queue, mut stream) =
1006 materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1007 assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1008
1009 let q = queue.clone();
1010 let blocker = thread::spawn(move || {
1011 q.offer(2)
1013 });
1014
1015 wait_until(Duration::from_secs(1), || {
1016 queue
1017 .shared
1018 .state
1019 .lock()
1020 .unwrap_or_else(|poison| poison.into_inner())
1021 .pending_count
1022 == 1
1023 });
1024 queue.complete();
1026 let result = blocker.join().unwrap();
1027 assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1028
1029 assert_eq!(stream.next(), Some(Ok(1)));
1031 assert_eq!(stream.next(), None);
1032 }
1033
1034 fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1037 source.run_with(Sink::queue()).unwrap()
1038 }
1039
1040 #[test]
1041 fn sink_queue_pull_elements() {
1042 let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1043 assert_eq!(queue.pull().unwrap(), Some(1));
1044 assert_eq!(queue.pull().unwrap(), Some(2));
1045 assert_eq!(queue.pull().unwrap(), Some(3));
1046 assert_eq!(queue.pull().unwrap(), None);
1047 }
1048
1049 #[test]
1050 fn sink_queue_pull_none_after_upstream_completion() {
1051 let queue = materialize_sink_queue(Source::from_iter([42]));
1052 assert_eq!(queue.pull().unwrap(), Some(42));
1053 assert_eq!(queue.pull().unwrap(), None);
1054 assert_eq!(queue.pull().unwrap(), None);
1055 }
1056
1057 #[test]
1058 fn sink_queue_pull_error_from_upstream_failure() {
1059 let queue =
1060 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1061 assert_eq!(
1062 queue.pull().unwrap_err(),
1063 StreamError::Failed("boom".into())
1064 );
1065 assert_eq!(
1066 queue.pull().unwrap_err(),
1067 StreamError::Failed("boom".into())
1068 );
1069 }
1070
1071 #[test]
1072 fn sink_queue_terminal_stickiness() {
1073 let queue = materialize_sink_queue(Source::<i32>::empty());
1074 assert_eq!(queue.pull().unwrap(), None);
1075 assert_eq!(queue.pull().unwrap(), None);
1076 assert_eq!(queue.pull().unwrap(), None);
1077 }
1078
1079 #[test]
1080 fn sink_queue_terminal_failure_stickiness() {
1081 let queue =
1082 materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1083 assert_eq!(
1084 queue.pull().unwrap_err(),
1085 StreamError::Failed("boom".into())
1086 );
1087 assert_eq!(
1088 queue.pull().unwrap_err(),
1089 StreamError::Failed("boom".into())
1090 );
1091 }
1092
1093 #[test]
1094 fn sink_queue_drop_cancels_upstream() {
1095 let queue = materialize_sink_queue(Source::repeat(42));
1096 drop(queue);
1098 }
1100
1101 #[test]
1102 fn sink_queue_drain_multiple_items() {
1103 let queue = materialize_sink_queue(Source::from_iter(0..100));
1104 for i in 0..100 {
1105 assert_eq!(queue.pull().unwrap(), Some(i));
1106 }
1107 assert_eq!(queue.pull().unwrap(), None);
1108 }
1109
1110 fn materialize_bounded_queue<T: Send + 'static>(
1113 capacity: usize,
1114 ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1115 let materializer = Materializer::new();
1116 let (stream, queue) = Source::<T>::queue_bounded(capacity)
1117 .factory
1118 .create(&materializer)
1119 .unwrap();
1120 (queue, stream)
1121 }
1122
1123 fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1124 let deadline = Instant::now() + timeout;
1125 while Instant::now() < deadline {
1126 if condition() {
1127 return;
1128 }
1129 thread::yield_now();
1130 thread::sleep(Duration::from_millis(1));
1131 }
1132 assert!(condition(), "condition was not met within {timeout:?}");
1133 }
1134}