Skip to main content

datum/
queue.rs

1//! Bounded and unbounded queues that bridge external producers into a Datum stream.
2//!
3//! [`SourceQueue`] / [`BoundedSourceQueue`] let external code feed elements into a running stream,
4//! while [`SinkQueue`] lets external code pull materialized results out on demand.
5
6use std::collections::VecDeque;
7use std::sync::atomic::Ordering;
8use std::sync::{Arc, Condvar, Mutex};
9
10use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
11use crate::{StreamError, StreamResult};
12use futures::channel::oneshot;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum QueueOfferResult {
16    Enqueued,
17    Dropped,
18    QueueClosed,
19    Failure(StreamError),
20}
21
22#[derive(Clone)]
23enum TerminalSignal {
24    Complete,
25    Error(StreamError),
26}
27
28// ── BoundedSourceQueue (synchronous, bounded, no overflow strategy) ─────────
29
30struct BoundedQueueShared<T> {
31    state: Mutex<BoundedQueueState<T>>,
32    available: Condvar,
33    capacity: usize,
34}
35
36struct BoundedQueueState<T> {
37    buffer: VecDeque<T>,
38    terminal: Option<TerminalSignal>,
39}
40
41impl<T> BoundedQueueShared<T> {
42    fn new(capacity: usize) -> Arc<Self> {
43        Arc::new(Self {
44            state: Mutex::new(BoundedQueueState {
45                buffer: VecDeque::with_capacity(capacity),
46                terminal: None,
47            }),
48            available: Condvar::new(),
49            capacity,
50        })
51    }
52}
53
54#[derive(Clone)]
55pub struct BoundedSourceQueue<T> {
56    shared: Arc<BoundedQueueShared<T>>,
57}
58
59impl<T> BoundedSourceQueue<T> {
60    pub fn offer(&self, elem: T) -> QueueOfferResult {
61        let mut state = self
62            .shared
63            .state
64            .lock()
65            .unwrap_or_else(|poison| poison.into_inner());
66
67        match &state.terminal {
68            Some(TerminalSignal::Complete) | Some(TerminalSignal::Error(_)) => {
69                return QueueOfferResult::QueueClosed;
70            }
71            None => {}
72        }
73
74        if state.buffer.len() < self.shared.capacity {
75            state.buffer.push_back(elem);
76            self.shared.available.notify_all();
77            QueueOfferResult::Enqueued
78        } else {
79            QueueOfferResult::Dropped
80        }
81    }
82
83    pub fn complete(&self) {
84        let mut state = self
85            .shared
86            .state
87            .lock()
88            .unwrap_or_else(|poison| poison.into_inner());
89        if state.terminal.is_none() {
90            state.terminal = Some(TerminalSignal::Complete);
91        }
92        drop(state);
93        self.shared.available.notify_all();
94    }
95
96    pub fn fail(&self, error: StreamError) {
97        let mut state = self
98            .shared
99            .state
100            .lock()
101            .unwrap_or_else(|poison| poison.into_inner());
102        if state.terminal.is_none() {
103            state.terminal = Some(TerminalSignal::Error(error));
104        }
105        drop(state);
106        self.shared.available.notify_all();
107    }
108}
109
110impl<T> Drop for BoundedSourceQueue<T> {
111    fn drop(&mut self) {
112        if Arc::strong_count(&self.shared) != 2 {
113            return;
114        }
115
116        let mut state = self
117            .shared
118            .state
119            .lock()
120            .unwrap_or_else(|poison| poison.into_inner());
121        if state.terminal.is_none() {
122            state.terminal = Some(TerminalSignal::Complete);
123        }
124        drop(state);
125        self.shared.available.notify_all();
126    }
127}
128
129struct BoundedQueueStream<T> {
130    shared: Arc<BoundedQueueShared<T>>,
131}
132
133impl<T> Iterator for BoundedQueueStream<T> {
134    type Item = StreamResult<T>;
135
136    fn next(&mut self) -> Option<Self::Item> {
137        let mut state = self
138            .shared
139            .state
140            .lock()
141            .unwrap_or_else(|poison| poison.into_inner());
142        loop {
143            if let Some(item) = state.buffer.pop_front() {
144                self.shared.available.notify_all();
145                return Some(Ok(item));
146            }
147            if let Some(terminal) = state.terminal.clone() {
148                return match terminal {
149                    TerminalSignal::Complete => None,
150                    TerminalSignal::Error(error) => Some(Err(error)),
151                };
152            }
153            state = self
154                .shared
155                .available
156                .wait(state)
157                .unwrap_or_else(|poison| poison.into_inner());
158        }
159    }
160}
161
162impl<T> Drop for BoundedQueueStream<T> {
163    fn drop(&mut self) {
164        let mut state = self
165            .shared
166            .state
167            .lock()
168            .unwrap_or_else(|poison| poison.into_inner());
169        if state.terminal.is_none() {
170            state.terminal = Some(TerminalSignal::Complete);
171        }
172        self.shared.available.notify_all();
173    }
174}
175
176// ── SourceQueue (with overflow strategy, maxConcurrentOffers = 1) ──────────
177
178struct SourceQueueShared<T> {
179    state: Mutex<SourceQueueState<T>>,
180    available: Condvar,
181    capacity: usize,
182    strategy: OverflowStrategy,
183}
184
185struct SourceQueueState<T> {
186    buffer: VecDeque<T>,
187    terminal: Option<TerminalSignal>,
188    terminating: bool,
189    pending_count: usize,
190}
191
192impl<T> SourceQueueShared<T> {
193    fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
194        Arc::new(Self {
195            state: Mutex::new(SourceQueueState {
196                buffer: VecDeque::with_capacity(capacity),
197                terminal: None,
198                terminating: false,
199                pending_count: 0,
200            }),
201            available: Condvar::new(),
202            capacity,
203            strategy,
204        })
205    }
206}
207
208pub struct SourceQueue<T> {
209    shared: Arc<SourceQueueShared<T>>,
210    completion: Option<StreamCompletion<NotUsed>>,
211}
212
213impl<T> Clone for SourceQueue<T> {
214    fn clone(&self) -> Self {
215        Self {
216            shared: Arc::clone(&self.shared),
217            completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
218                "cannot clone queue completion handle; use watch_completion() on the original handle"
219                    .into(),
220            )))),
221        }
222    }
223}
224
225impl<T: Send + 'static> SourceQueue<T> {
226    pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
227        self.completion.take().unwrap_or_else(|| {
228            StreamCompletion::ready(Err(StreamError::Failed(
229                "queue completion handle already taken".into(),
230            )))
231        })
232    }
233
234    pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
235        let strategy = self.shared.strategy;
236        let capacity = self.shared.capacity;
237        let mut state = self
238            .shared
239            .state
240            .lock()
241            .unwrap_or_else(|poison| poison.into_inner());
242
243        if state.terminal.is_some() {
244            return Ok(QueueOfferResult::QueueClosed);
245        }
246
247        if state.terminating {
248            return Ok(QueueOfferResult::QueueClosed);
249        }
250
251        if state.buffer.len() < capacity {
252            state.buffer.push_back(elem);
253            drop(state);
254            self.shared.available.notify_all();
255            return Ok(QueueOfferResult::Enqueued);
256        }
257
258        match strategy {
259            OverflowStrategy::DropHead => {
260                let _ = state.buffer.pop_front();
261                state.buffer.push_back(elem);
262                drop(state);
263                self.shared.available.notify_all();
264                Ok(QueueOfferResult::Enqueued)
265            }
266            OverflowStrategy::DropTail => {
267                let _ = state.buffer.pop_back();
268                state.buffer.push_back(elem);
269                drop(state);
270                self.shared.available.notify_all();
271                Ok(QueueOfferResult::Enqueued)
272            }
273            OverflowStrategy::DropBuffer => {
274                state.buffer.clear();
275                state.buffer.push_back(elem);
276                drop(state);
277                self.shared.available.notify_all();
278                Ok(QueueOfferResult::Enqueued)
279            }
280            OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
281            OverflowStrategy::Fail => {
282                state.buffer.clear();
283                let error =
284                    StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
285                state.terminal = Some(TerminalSignal::Error(error.clone()));
286                drop(state);
287                self.shared.available.notify_all();
288                Ok(QueueOfferResult::Failure(error))
289            }
290            OverflowStrategy::Backpressure => {
291                if state.pending_count >= 1 {
292                    return Err(StreamError::Failed(
293                        "Too many concurrent offers. Specified maximum is 1. \
294                         You have to wait for the previous offer to resolve to send another request"
295                            .into(),
296                    ));
297                }
298                state.pending_count += 1;
299                loop {
300                    if state.terminal.is_some() || state.terminating {
301                        state.pending_count -= 1;
302                        return Ok(QueueOfferResult::QueueClosed);
303                    }
304                    if state.buffer.len() < capacity {
305                        state.pending_count -= 1;
306                        state.buffer.push_back(elem);
307                        drop(state);
308                        self.shared.available.notify_all();
309                        return Ok(QueueOfferResult::Enqueued);
310                    }
311                    state = self
312                        .shared
313                        .available
314                        .wait(state)
315                        .unwrap_or_else(|poison| poison.into_inner());
316                }
317            }
318        }
319    }
320
321    pub fn complete(&self) {
322        let mut state = self
323            .shared
324            .state
325            .lock()
326            .unwrap_or_else(|poison| poison.into_inner());
327        if state.buffer.is_empty() && state.pending_count == 0 {
328            if state.terminal.is_none() {
329                state.terminal = Some(TerminalSignal::Complete);
330            }
331        } else {
332            state.terminating = true;
333        }
334        drop(state);
335        self.shared.available.notify_all();
336    }
337
338    pub fn fail(&self, error: StreamError) {
339        let mut state = self
340            .shared
341            .state
342            .lock()
343            .unwrap_or_else(|poison| poison.into_inner());
344        if state.terminal.is_none() {
345            state.terminal = Some(TerminalSignal::Error(error));
346        }
347        drop(state);
348        self.shared.available.notify_all();
349    }
350}
351
352impl<T> Drop for SourceQueue<T> {
353    fn drop(&mut self) {
354        if Arc::strong_count(&self.shared) != 2 {
355            return;
356        }
357
358        let mut state = self
359            .shared
360            .state
361            .lock()
362            .unwrap_or_else(|poison| poison.into_inner());
363        if state.terminal.is_none() && !state.terminating {
364            state.terminal = Some(TerminalSignal::Complete);
365        }
366        drop(state);
367        self.shared.available.notify_all();
368    }
369}
370
371struct SourceQueueStream<T: Send + 'static> {
372    shared: Arc<SourceQueueShared<T>>,
373    completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
374}
375
376impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
377    type Item = StreamResult<T>;
378
379    fn next(&mut self) -> Option<Self::Item> {
380        let mut state = self
381            .shared
382            .state
383            .lock()
384            .unwrap_or_else(|poison| poison.into_inner());
385        loop {
386            if let Some(TerminalSignal::Error(error)) = &state.terminal {
387                let error = error.clone();
388                drop(state);
389                self.signal_completion(Err(error.clone()));
390                self.shared.available.notify_all();
391                return Some(Err(error));
392            }
393
394            if let Some(item) = state.buffer.pop_front() {
395                drop(state);
396                self.shared.available.notify_all();
397                return Some(Ok(item));
398            }
399
400            if let Some(terminal) = state.terminal.clone() {
401                if state.terminating {
402                    state.terminating = false;
403                }
404                drop(state);
405                self.signal_completion(match &terminal {
406                    TerminalSignal::Complete => Ok(NotUsed),
407                    TerminalSignal::Error(error) => Err(error.clone()),
408                });
409                self.shared.available.notify_all();
410                return match terminal {
411                    TerminalSignal::Complete => None,
412                    TerminalSignal::Error(error) => Some(Err(error)),
413                };
414            }
415
416            if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
417                state.terminal = Some(TerminalSignal::Complete);
418                state.terminating = false;
419                drop(state);
420                self.signal_completion(Ok(NotUsed));
421                self.shared.available.notify_all();
422                return None;
423            }
424
425            state = self
426                .shared
427                .available
428                .wait(state)
429                .unwrap_or_else(|poison| poison.into_inner());
430        }
431    }
432}
433
434impl<T: Send + 'static> SourceQueueStream<T> {
435    fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
436        if let Some(sender) = self.completion_sender.take() {
437            let _ = sender.send(result);
438        }
439    }
440}
441
442impl<T: Send + 'static> Drop for SourceQueueStream<T> {
443    fn drop(&mut self) {
444        let mut state = self
445            .shared
446            .state
447            .lock()
448            .unwrap_or_else(|poison| poison.into_inner());
449        if state.terminal.is_none() {
450            state.terminal = Some(TerminalSignal::Complete);
451        }
452        state.terminating = false;
453        drop(state);
454        self.signal_completion(Ok(NotUsed));
455        self.shared.available.notify_all();
456    }
457}
458
459// ── SinkQueue (blocking pull from sink) ────────────────────────────────────
460
461struct SinkQueueShared<T> {
462    state: Mutex<SinkQueueState<T>>,
463    available: Condvar,
464}
465
466struct SinkQueueState<T> {
467    buffer: VecDeque<T>,
468    error: Option<StreamError>,
469    completed: bool,
470}
471
472impl<T> SinkQueueShared<T> {
473    fn new() -> Arc<Self> {
474        Arc::new(Self {
475            state: Mutex::new(SinkQueueState {
476                buffer: VecDeque::new(),
477                error: None,
478                completed: false,
479            }),
480            available: Condvar::new(),
481        })
482    }
483}
484
485pub struct SinkQueue<T> {
486    shared: Arc<SinkQueueShared<T>>,
487    _completion: StreamCompletion<NotUsed>,
488}
489
490impl<T> SinkQueue<T> {
491    pub fn pull(&self) -> StreamResult<Option<T>> {
492        let mut state = self
493            .shared
494            .state
495            .lock()
496            .unwrap_or_else(|poison| poison.into_inner());
497        loop {
498            if let Some(item) = state.buffer.pop_front() {
499                return Ok(Some(item));
500            }
501            if let Some(error) = state.error.clone() {
502                return Err(error);
503            }
504            if state.completed {
505                return Ok(None);
506            }
507            state = self
508                .shared
509                .available
510                .wait(state)
511                .unwrap_or_else(|poison| poison.into_inner());
512        }
513    }
514}
515
516// ── Source factory methods ──────────────────────────────────────────────────
517
518impl<T: Send + 'static> Source<T, NotUsed> {
519    #[must_use]
520    pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
521        assert!(capacity > 0, "queue capacity must be greater than zero");
522        Source::from_materialized_factory(move |_materializer| {
523            let shared = BoundedQueueShared::new(capacity);
524            let stream: BoxStream<T> = Box::new(BoundedQueueStream {
525                shared: Arc::clone(&shared),
526            });
527            let handle = BoundedSourceQueue { shared };
528            Ok((stream, handle))
529        })
530    }
531
532    #[must_use]
533    pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
534        assert!(capacity > 0, "queue capacity must be greater than zero");
535        Source::from_materialized_factory(move |_materializer| {
536            let shared = SourceQueueShared::new(capacity, strategy);
537            let (completion_sender, completion_receiver) = oneshot::channel();
538            let stream: BoxStream<T> = Box::new(SourceQueueStream {
539                shared: Arc::clone(&shared),
540                completion_sender: Some(completion_sender),
541            });
542            let handle = SourceQueue {
543                shared,
544                completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
545            };
546            Ok((stream, handle))
547        })
548    }
549}
550
551// ── Sink factory methods ────────────────────────────────────────────────────
552
553impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
554    #[must_use]
555    pub fn queue() -> Self {
556        Sink::from_runner(move |mut input, materializer| {
557            let shared = SinkQueueShared::new();
558            let worker_shared = Arc::clone(&shared);
559
560            let completion = materializer.spawn_stream(move |stream_cancelled| {
561                loop {
562                    if stream_cancelled.load(Ordering::SeqCst) {
563                        return Ok(NotUsed);
564                    }
565                    match input.next() {
566                        Some(Ok(item)) => {
567                            let mut state = worker_shared
568                                .state
569                                .lock()
570                                .unwrap_or_else(|poison| poison.into_inner());
571                            state.buffer.push_back(item);
572                            drop(state);
573                            worker_shared.available.notify_all();
574                        }
575                        Some(Err(error)) => {
576                            let mut state = worker_shared
577                                .state
578                                .lock()
579                                .unwrap_or_else(|poison| poison.into_inner());
580                            if state.error.is_none() {
581                                state.error = Some(error);
582                            }
583                            drop(state);
584                            worker_shared.available.notify_all();
585                            return Ok(NotUsed);
586                        }
587                        None => {
588                            let mut state = worker_shared
589                                .state
590                                .lock()
591                                .unwrap_or_else(|poison| poison.into_inner());
592                            state.completed = true;
593                            drop(state);
594                            worker_shared.available.notify_all();
595                            return Ok(NotUsed);
596                        }
597                    }
598                }
599            });
600
601            Ok(SinkQueue {
602                shared: Arc::clone(&shared),
603                _completion: completion,
604            })
605        })
606    }
607}
608
609// ── Tests ───────────────────────────────────────────────────────────────────
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use crate::stream::Materializer;
615    use std::sync::atomic::AtomicBool;
616    use std::sync::atomic::Ordering;
617    use std::sync::mpsc;
618    use std::thread;
619    use std::time::{Duration, Instant};
620
621    // ── BoundedSourceQueue tests ────────────────────────────────────────
622
623    #[test]
624    fn bounded_offer_accepted_vs_processed_distinct() {
625        let (queue, mut stream) = materialize_bounded_queue(2);
626        // offer accepted means enqueued into buffer, not processed downstream
627        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
628        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
629        // proceed to process one element
630        assert_eq!(stream.next(), Some(Ok(1)));
631        // accepted is distinct from processed — element 2 still in buffer
632        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
633        assert_eq!(stream.next(), Some(Ok(2)));
634        assert_eq!(stream.next(), Some(Ok(3)));
635    }
636
637    #[test]
638    fn bounded_offer_dropped_when_full() {
639        let (queue, mut stream) = materialize_bounded_queue(1);
640        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
641        assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
642        assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
643        queue.complete();
644        assert_eq!(stream.next(), Some(Ok(1)));
645        assert_eq!(stream.next(), None);
646    }
647
648    #[test]
649    fn bounded_queue_closed_after_complete() {
650        let (queue, mut stream) = materialize_bounded_queue(2);
651        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
652        queue.complete();
653        // element still drainable
654        assert_eq!(stream.next(), Some(Ok(1)));
655        assert_eq!(stream.next(), None);
656        // offers after complete return QueueClosed
657        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
658    }
659
660    #[test]
661    fn bounded_queue_closed_after_fail() {
662        let (queue, mut stream) = materialize_bounded_queue(2);
663        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
664        queue.fail(StreamError::Failed("boom".into()));
665        // offers after fail return QueueClosed
666        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
667        // buffered elements drain before failure surfaces
668        assert_eq!(stream.next(), Some(Ok(1)));
669        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
670    }
671
672    #[test]
673    fn bounded_drop_handle_completes_stream() {
674        let queue = {
675            let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
676            queue.offer(1);
677            queue.offer(2);
678            assert_eq!(stream.next(), Some(Ok(1)));
679            queue
680        };
681        // stream dropped, handle dropped
682        assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
683    }
684
685    #[test]
686    fn bounded_drop_last_producer_completes_stream() {
687        let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
688        let producer = queue.clone();
689        let consumer = thread::spawn(move || {
690            assert_eq!(stream.next(), None);
691        });
692        drop(producer);
693        drop(queue);
694        consumer.join().unwrap();
695    }
696
697    #[test]
698    fn bounded_drain_before_complete() {
699        let (queue, mut stream) = materialize_bounded_queue(3);
700        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
701        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
702        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
703        assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
704        assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
705        queue.complete();
706        assert_eq!(stream.next(), Some(Ok(1)));
707        assert_eq!(stream.next(), Some(Ok(2)));
708        assert_eq!(stream.next(), Some(Ok(3)));
709        assert_eq!(stream.next(), None);
710    }
711
712    #[test]
713    fn bounded_terminal_completion_is_sticky() {
714        let (queue, mut stream) = materialize_bounded_queue(2);
715        queue.offer(1);
716        queue.complete();
717        assert_eq!(stream.next(), Some(Ok(1)));
718        assert_eq!(stream.next(), None);
719        assert_eq!(stream.next(), None);
720        assert_eq!(stream.next(), None);
721    }
722
723    #[test]
724    fn bounded_terminal_failure_is_sticky() {
725        let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
726        queue.fail(StreamError::Failed("boom".into()));
727        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
728        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
729    }
730
731    #[test]
732    fn bounded_producer_consumer_across_threads() {
733        let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
734        // start consumer first so it drains as producer fills
735        let consumer = thread::spawn(move || {
736            let mut collected = Vec::new();
737            while let Some(Ok(item)) = stream.next() {
738                collected.push(item);
739            }
740            collected
741        });
742        let producer = thread::spawn({
743            let queue = queue.clone();
744            move || {
745                for i in 0..10 {
746                    assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
747                }
748                queue.complete();
749            }
750        });
751        producer.join().unwrap();
752        let collected = consumer.join().unwrap();
753        assert_eq!(collected, (0..10).collect::<Vec<_>>());
754    }
755
756    #[test]
757    fn bounded_multi_producer_single_consumer() {
758        let n = 50_i32;
759        let producer_count = 4;
760        let total = (producer_count * n) as usize;
761        let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
762
763        // start consumer first
764        let consumer = thread::spawn(move || {
765            let mut collected = Vec::new();
766            while let Some(Ok(item)) = stream.next() {
767                collected.push(item);
768            }
769            collected
770        });
771
772        let mut handles = Vec::new();
773        for p in 0..producer_count {
774            let q = queue.clone();
775            handles.push(thread::spawn(move || {
776                for i in 0..n {
777                    assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
778                }
779            }));
780        }
781        for h in handles {
782            h.join().unwrap();
783        }
784        queue.complete();
785        let mut collected = consumer.join().unwrap();
786        collected.sort_unstable();
787        assert_eq!(collected.len(), total);
788        collected.sort_unstable();
789        assert_eq!(collected.len(), (producer_count * n) as usize);
790    }
791
792    // ── SourceQueue tests ───────────────────────────────────────────────
793
794    fn materialize_source_queue<T: Send + 'static>(
795        capacity: usize,
796        strategy: OverflowStrategy,
797    ) -> (SourceQueue<T>, BoxStream<T>) {
798        let materializer = Materializer::new();
799        let (stream, queue) = Source::<T>::queue(capacity, strategy)
800            .factory
801            .create(&materializer)
802            .unwrap();
803        (queue, stream)
804    }
805
806    #[test]
807    fn source_queue_offer_enqueued() {
808        let (queue, mut stream) =
809            materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
810        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
811        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
812        assert_eq!(stream.next(), Some(Ok(1)));
813        assert_eq!(stream.next(), Some(Ok(2)));
814    }
815
816    #[test]
817    fn source_queue_drop_head() {
818        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
819        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
820        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
821        // buffer full, drop head
822        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
823        queue.complete();
824        assert_eq!(stream.next(), Some(Ok(2)));
825        assert_eq!(stream.next(), Some(Ok(3)));
826        assert_eq!(stream.next(), None);
827    }
828
829    #[test]
830    fn source_queue_drop_tail() {
831        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
832        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
833        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
834        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
835        queue.complete();
836        assert_eq!(stream.next(), Some(Ok(1)));
837        assert_eq!(stream.next(), Some(Ok(3)));
838        assert_eq!(stream.next(), None);
839    }
840
841    #[test]
842    fn source_queue_drop_buffer() {
843        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
844        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
845        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
846        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
847        queue.complete();
848        // DropBuffer clears the buffer, then enqueues the new element
849        assert_eq!(stream.next(), Some(Ok(3)));
850        assert_eq!(stream.next(), None);
851    }
852
853    #[test]
854    fn source_queue_drop_new() {
855        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
856        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
857        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
858        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
859        queue.complete();
860        assert_eq!(stream.next(), Some(Ok(1)));
861        assert_eq!(stream.next(), Some(Ok(2)));
862        assert_eq!(stream.next(), None);
863    }
864
865    #[test]
866    fn source_queue_fail_strategy() {
867        let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
868        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
869        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
870        match queue.offer(3).unwrap() {
871            QueueOfferResult::Failure(e) => {
872                assert!(format!("{e:?}").contains("Buffer overflow"));
873            }
874            other => panic!("expected Failure, got {other:?}"),
875        }
876    }
877
878    #[test]
879    fn source_queue_backpressure_blocks() {
880        let (queue, mut stream) =
881            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
882        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
883
884        // Keep the stream alive until the blocked offer has a chance to observe
885        // the newly freed slot.
886        let consumed = Arc::new(AtomicBool::new(false));
887        let c = Arc::clone(&consumed);
888        let (release_tx, release_rx) = mpsc::channel();
889        let consumer = thread::spawn(move || {
890            assert_eq!(stream.next(), Some(Ok(1)));
891            c.store(true, Ordering::SeqCst);
892            release_rx.recv().unwrap();
893        });
894
895        wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
896        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
897        release_tx.send(()).unwrap();
898        consumer.join().unwrap();
899        assert!(consumed.load(Ordering::SeqCst));
900    }
901
902    #[test]
903    fn source_queue_concurrent_offer_violation() {
904        let (queue, mut stream) =
905            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
906        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
907
908        let q = queue.clone();
909        let started = Arc::new(AtomicBool::new(false));
910        let s = Arc::clone(&started);
911        let blocker = thread::spawn(move || {
912            s.store(true, Ordering::SeqCst);
913            // this blocks until drained
914            let _ = q.offer(2);
915        });
916
917        // wait for blocker to start
918        while !started.load(Ordering::SeqCst) {
919            thread::yield_now();
920        }
921        wait_until(Duration::from_secs(1), || {
922            queue
923                .shared
924                .state
925                .lock()
926                .unwrap_or_else(|poison| poison.into_inner())
927                .pending_count
928                == 1
929        });
930
931        // second concurrent offer should fail
932        let result = queue.offer(3);
933        assert!(result.is_err());
934        assert!(format!("{result:?}").contains("Too many concurrent offers"));
935
936        // drain to unblock
937        assert_eq!(stream.next(), Some(Ok(1)));
938        blocker.join().unwrap();
939    }
940
941    #[test]
942    fn source_queue_watch_completion() {
943        let (queue, mut stream) =
944            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
945        queue.offer(1).unwrap();
946        queue.complete();
947        assert_eq!(stream.next(), Some(Ok(1)));
948        assert_eq!(stream.next(), None);
949        assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
950    }
951
952    #[test]
953    fn source_queue_watch_completion_on_failure() {
954        let (queue, mut stream) =
955            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
956        queue.fail(StreamError::Failed("boom".into()));
957        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
958        assert_eq!(
959            queue.watch_completion().wait(),
960            Err(StreamError::Failed("boom".into()))
961        );
962    }
963
964    #[test]
965    fn source_queue_terminal_completion_is_sticky() {
966        let (queue, mut stream) =
967            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
968        queue.complete();
969        assert_eq!(stream.next(), None);
970        assert_eq!(stream.next(), None);
971        assert_eq!(stream.next(), None);
972    }
973
974    #[test]
975    fn source_queue_offer_after_complete_returns_queue_closed() {
976        let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
977        queue.complete();
978        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
979    }
980
981    #[test]
982    fn source_queue_drop_stream_closes_queue() {
983        let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
984        queue.offer(1).unwrap();
985        drop(stream);
986        // handle still alive, stream dropped → terminal set
987        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
988    }
989
990    #[test]
991    fn source_queue_drop_last_producer_completes_stream() {
992        let (queue, mut stream) =
993            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
994        let producer = queue.clone();
995        let consumer = thread::spawn(move || {
996            assert_eq!(stream.next(), None);
997        });
998        drop(producer);
999        drop(queue);
1000        consumer.join().unwrap();
1001    }
1002
1003    #[test]
1004    fn source_queue_backpressure_unblocks_on_complete() {
1005        let (queue, mut stream) =
1006            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1007        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1008
1009        let q = queue.clone();
1010        let blocker = thread::spawn(move || {
1011            // blocks waiting for space
1012            q.offer(2)
1013        });
1014
1015        wait_until(Duration::from_secs(1), || {
1016            queue
1017                .shared
1018                .state
1019                .lock()
1020                .unwrap_or_else(|poison| poison.into_inner())
1021                .pending_count
1022                == 1
1023        });
1024        // complete wakes the backpressured offerer
1025        queue.complete();
1026        let result = blocker.join().unwrap();
1027        assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1028
1029        // buffer still has item 1
1030        assert_eq!(stream.next(), Some(Ok(1)));
1031        assert_eq!(stream.next(), None);
1032    }
1033
1034    // ── SinkQueue tests ─────────────────────────────────────────────────
1035
1036    fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1037        source.run_with(Sink::queue()).unwrap()
1038    }
1039
1040    #[test]
1041    fn sink_queue_pull_elements() {
1042        let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1043        assert_eq!(queue.pull().unwrap(), Some(1));
1044        assert_eq!(queue.pull().unwrap(), Some(2));
1045        assert_eq!(queue.pull().unwrap(), Some(3));
1046        assert_eq!(queue.pull().unwrap(), None);
1047    }
1048
1049    #[test]
1050    fn sink_queue_pull_none_after_upstream_completion() {
1051        let queue = materialize_sink_queue(Source::from_iter([42]));
1052        assert_eq!(queue.pull().unwrap(), Some(42));
1053        assert_eq!(queue.pull().unwrap(), None);
1054        assert_eq!(queue.pull().unwrap(), None);
1055    }
1056
1057    #[test]
1058    fn sink_queue_pull_error_from_upstream_failure() {
1059        let queue =
1060            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1061        assert_eq!(
1062            queue.pull().unwrap_err(),
1063            StreamError::Failed("boom".into())
1064        );
1065        assert_eq!(
1066            queue.pull().unwrap_err(),
1067            StreamError::Failed("boom".into())
1068        );
1069    }
1070
1071    #[test]
1072    fn sink_queue_terminal_stickiness() {
1073        let queue = materialize_sink_queue(Source::<i32>::empty());
1074        assert_eq!(queue.pull().unwrap(), None);
1075        assert_eq!(queue.pull().unwrap(), None);
1076        assert_eq!(queue.pull().unwrap(), None);
1077    }
1078
1079    #[test]
1080    fn sink_queue_terminal_failure_stickiness() {
1081        let queue =
1082            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1083        assert_eq!(
1084            queue.pull().unwrap_err(),
1085            StreamError::Failed("boom".into())
1086        );
1087        assert_eq!(
1088            queue.pull().unwrap_err(),
1089            StreamError::Failed("boom".into())
1090        );
1091    }
1092
1093    #[test]
1094    fn sink_queue_drop_cancels_upstream() {
1095        let queue = materialize_sink_queue(Source::repeat(42));
1096        // dropping queue cancels worker
1097        drop(queue);
1098        // if this doesn't deadlock, the worker was cancelled
1099    }
1100
1101    #[test]
1102    fn sink_queue_drain_multiple_items() {
1103        let queue = materialize_sink_queue(Source::from_iter(0..100));
1104        for i in 0..100 {
1105            assert_eq!(queue.pull().unwrap(), Some(i));
1106        }
1107        assert_eq!(queue.pull().unwrap(), None);
1108    }
1109
1110    // ── Helpers ─────────────────────────────────────────────────────────
1111
1112    fn materialize_bounded_queue<T: Send + 'static>(
1113        capacity: usize,
1114    ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1115        let materializer = Materializer::new();
1116        let (stream, queue) = Source::<T>::queue_bounded(capacity)
1117            .factory
1118            .create(&materializer)
1119            .unwrap();
1120        (queue, stream)
1121    }
1122
1123    fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1124        let deadline = Instant::now() + timeout;
1125        while Instant::now() < deadline {
1126            if condition() {
1127                return;
1128            }
1129            thread::yield_now();
1130            thread::sleep(Duration::from_millis(1));
1131        }
1132        assert!(condition(), "condition was not met within {timeout:?}");
1133    }
1134}