Skip to main content

datum/
queue.rs

1//! Bounded and unbounded queues that bridge external producers into a Datum stream.
2//!
3//! [`SourceQueue`] / [`BoundedSourceQueue`] let external code feed elements into a running stream,
4//! while [`SinkQueue`] lets external code pull materialized results out on demand.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU8, Ordering, fence};
8use std::sync::{Arc, Condvar, Mutex, OnceLock};
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12
13use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
14use crate::{StreamError, StreamResult};
15use futures::channel::oneshot;
16
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum QueueOfferResult {
19    Enqueued,
20    Dropped,
21    QueueClosed,
22    Failure(StreamError),
23}
24
25#[derive(Clone)]
26enum TerminalSignal {
27    Complete,
28    Error(StreamError),
29}
30
31// ── BoundedSourceQueue (synchronous, bounded, no overflow strategy) ─────────
32//
33// The element buffer is a lock-free bounded MPMC queue (`crossbeam_queue::
34// ArrayQueue`), so producers and the draining consumer never contend on a
35// shared mutex on the hot path and no per-element allocation occurs (the
36// backing array is allocated once). Terminal state (complete/fail) is a small
37// atomic checked with a single load per offer. The consumer parks on a condvar
38// only when it observes the buffer empty; producers wake it via a `parked` flag
39// guarded by a `SeqCst` fence that closes the classic store/load (Dekker) race,
40// and each park is additionally bounded by `PARK_BACKSTOP` so any
41// theoretically-missed wakeup self-heals rather than deadlocking.
42
43const TERM_NONE: u8 = 0;
44const TERM_COMPLETE: u8 = 1;
45const TERM_ERROR: u8 = 2;
46
47/// Upper bound on a single consumer park. Wakeups are normally delivered
48/// promptly by `wake_consumer`; this bound is a defensive backstop so that a
49/// lost wakeup (should one ever slip past the fence protocol) costs at most this
50/// much latency instead of deadlocking the stream.
51const PARK_BACKSTOP: Duration = Duration::from_millis(10);
52
53struct BoundedQueueShared<T> {
54    buffer: ArrayQueue<T>,
55    /// `TERM_NONE` / `TERM_COMPLETE` / `TERM_ERROR`; sticky once non-`NONE`.
56    terminal: AtomicU8,
57    /// Failure payload, published before `terminal` flips to `TERM_ERROR`.
58    error: OnceLock<StreamError>,
59    /// Guards only the condvar wait; touched solely on the slow park/wake path.
60    park: Mutex<()>,
61    available: Condvar,
62    /// True while the consumer is (about to be) blocked in `wait_timeout`.
63    parked: AtomicBool,
64}
65
66impl<T> BoundedQueueShared<T> {
67    fn new(capacity: usize) -> Arc<Self> {
68        Arc::new(Self {
69            buffer: ArrayQueue::new(capacity),
70            terminal: AtomicU8::new(TERM_NONE),
71            error: OnceLock::new(),
72            park: Mutex::new(()),
73            available: Condvar::new(),
74            parked: AtomicBool::new(false),
75        })
76    }
77
78    /// Mark the queue completed (sticky: the first terminal signal wins).
79    fn set_complete(&self) {
80        let _ = self.terminal.compare_exchange(
81            TERM_NONE,
82            TERM_COMPLETE,
83            Ordering::AcqRel,
84            Ordering::Relaxed,
85        );
86        self.wake_consumer();
87    }
88
89    /// Mark the queue failed (sticky: the first terminal signal wins).
90    fn set_error(&self, error: StreamError) {
91        // Publish the payload before flipping the state so a consumer that
92        // observes `TERM_ERROR` can always read it back.
93        let _ = self.error.set(error);
94        let _ = self.terminal.compare_exchange(
95            TERM_NONE,
96            TERM_ERROR,
97            Ordering::AcqRel,
98            Ordering::Relaxed,
99        );
100        self.wake_consumer();
101    }
102
103    /// Wake the consumer if it is (or is about to be) parked. In steady state —
104    /// the consumer actively draining — this is a single fenced atomic load and
105    /// touches no lock.
106    fn wake_consumer(&self) {
107        // Ordered after the producer's enqueue / terminal store. Pairs with the
108        // fence on the consumer's park path: the two `SeqCst` fences cannot be
109        // mutually reordered, so the producer's enqueue and the consumer's
110        // `parked` store cannot both be missed.
111        fence(Ordering::SeqCst);
112        if self.parked.load(Ordering::Relaxed) {
113            let _guard = self
114                .park
115                .lock()
116                .unwrap_or_else(|poison| poison.into_inner());
117            self.available.notify_one();
118        }
119    }
120}
121
122#[derive(Clone)]
123pub struct BoundedSourceQueue<T> {
124    shared: Arc<BoundedQueueShared<T>>,
125}
126
127impl<T> BoundedSourceQueue<T> {
128    pub fn offer(&self, elem: T) -> QueueOfferResult {
129        if self.shared.terminal.load(Ordering::Acquire) != TERM_NONE {
130            return QueueOfferResult::QueueClosed;
131        }
132        match self.shared.buffer.push(elem) {
133            Ok(()) => {
134                self.shared.wake_consumer();
135                QueueOfferResult::Enqueued
136            }
137            Err(_full) => QueueOfferResult::Dropped,
138        }
139    }
140
141    pub fn complete(&self) {
142        self.shared.set_complete();
143    }
144
145    pub fn fail(&self, error: StreamError) {
146        self.shared.set_error(error);
147    }
148}
149
150impl<T> Drop for BoundedSourceQueue<T> {
151    fn drop(&mut self) {
152        // Only the last live producer handle (handle + stream == 2) completes
153        // the stream; earlier clones leave it running.
154        if Arc::strong_count(&self.shared) != 2 {
155            return;
156        }
157        self.shared.set_complete();
158    }
159}
160
161struct BoundedQueueStream<T> {
162    shared: Arc<BoundedQueueShared<T>>,
163}
164
165impl<T> Iterator for BoundedQueueStream<T> {
166    type Item = StreamResult<T>;
167
168    fn next(&mut self) -> Option<Self::Item> {
169        let shared = &*self.shared;
170        loop {
171            // Drain buffered elements before honoring any terminal signal.
172            if let Some(item) = shared.buffer.pop() {
173                return Some(Ok(item));
174            }
175            match shared.terminal.load(Ordering::Acquire) {
176                TERM_COMPLETE => {
177                    // Synchronizing with `complete()` can make a producer's
178                    // prior push visible only after the first empty pop above.
179                    // Re-drain once before honoring the terminal signal so a
180                    // last accepted element is not lost at the boundary.
181                    if let Some(item) = shared.buffer.pop() {
182                        return Some(Ok(item));
183                    }
184                    return None;
185                }
186                TERM_ERROR => {
187                    if let Some(item) = shared.buffer.pop() {
188                        return Some(Ok(item));
189                    }
190                    let error = shared
191                        .error
192                        .get()
193                        .cloned()
194                        .unwrap_or_else(|| StreamError::Failed("queue failed".into()));
195                    return Some(Err(error));
196                }
197                _ => {}
198            }
199
200            // Empty and live: park until a producer enqueues or signals.
201            let guard = shared
202                .park
203                .lock()
204                .unwrap_or_else(|poison| poison.into_inner());
205            shared.parked.store(true, Ordering::Relaxed);
206            // Re-check after publishing `parked`. The fence pairs with the one
207            // in `wake_consumer`, so an enqueue that landed just before we set
208            // `parked` is observed here rather than slept through.
209            fence(Ordering::SeqCst);
210            if !shared.buffer.is_empty() || shared.terminal.load(Ordering::Acquire) != TERM_NONE {
211                shared.parked.store(false, Ordering::Relaxed);
212                drop(guard);
213                continue;
214            }
215            let (guard, _timeout) = shared
216                .available
217                .wait_timeout(guard, PARK_BACKSTOP)
218                .unwrap_or_else(|poison| poison.into_inner());
219            shared.parked.store(false, Ordering::Relaxed);
220            drop(guard);
221        }
222    }
223}
224
225impl<T> Drop for BoundedQueueStream<T> {
226    fn drop(&mut self) {
227        // Stream gone: close the queue so later offers see `QueueClosed`.
228        self.shared.set_complete();
229    }
230}
231
232// ── SourceQueue (with overflow strategy, maxConcurrentOffers = 1) ──────────
233
234struct SourceQueueShared<T> {
235    state: Mutex<SourceQueueState<T>>,
236    available: Condvar,
237    capacity: usize,
238    strategy: OverflowStrategy,
239}
240
241struct SourceQueueState<T> {
242    buffer: VecDeque<T>,
243    terminal: Option<TerminalSignal>,
244    terminating: bool,
245    pending_count: usize,
246}
247
248impl<T> SourceQueueShared<T> {
249    fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
250        Arc::new(Self {
251            state: Mutex::new(SourceQueueState {
252                buffer: VecDeque::with_capacity(capacity),
253                terminal: None,
254                terminating: false,
255                pending_count: 0,
256            }),
257            available: Condvar::new(),
258            capacity,
259            strategy,
260        })
261    }
262}
263
264pub struct SourceQueue<T> {
265    shared: Arc<SourceQueueShared<T>>,
266    completion: Option<StreamCompletion<NotUsed>>,
267}
268
269impl<T> Clone for SourceQueue<T> {
270    fn clone(&self) -> Self {
271        Self {
272            shared: Arc::clone(&self.shared),
273            completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
274                "cannot clone queue completion handle; use watch_completion() on the original handle"
275                    .into(),
276            )))),
277        }
278    }
279}
280
281impl<T: Send + 'static> SourceQueue<T> {
282    pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
283        self.completion.take().unwrap_or_else(|| {
284            StreamCompletion::ready(Err(StreamError::Failed(
285                "queue completion handle already taken".into(),
286            )))
287        })
288    }
289
290    pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
291        let strategy = self.shared.strategy;
292        let capacity = self.shared.capacity;
293        let mut state = self
294            .shared
295            .state
296            .lock()
297            .unwrap_or_else(|poison| poison.into_inner());
298
299        if state.terminal.is_some() {
300            return Ok(QueueOfferResult::QueueClosed);
301        }
302
303        if state.terminating {
304            return Ok(QueueOfferResult::QueueClosed);
305        }
306
307        if state.buffer.len() < capacity {
308            state.buffer.push_back(elem);
309            drop(state);
310            self.shared.available.notify_all();
311            return Ok(QueueOfferResult::Enqueued);
312        }
313
314        match strategy {
315            OverflowStrategy::DropHead => {
316                let _ = state.buffer.pop_front();
317                state.buffer.push_back(elem);
318                drop(state);
319                self.shared.available.notify_all();
320                Ok(QueueOfferResult::Enqueued)
321            }
322            OverflowStrategy::DropTail => {
323                let _ = state.buffer.pop_back();
324                state.buffer.push_back(elem);
325                drop(state);
326                self.shared.available.notify_all();
327                Ok(QueueOfferResult::Enqueued)
328            }
329            OverflowStrategy::DropBuffer => {
330                state.buffer.clear();
331                state.buffer.push_back(elem);
332                drop(state);
333                self.shared.available.notify_all();
334                Ok(QueueOfferResult::Enqueued)
335            }
336            OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
337            OverflowStrategy::Fail => {
338                state.buffer.clear();
339                let error =
340                    StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
341                state.terminal = Some(TerminalSignal::Error(error.clone()));
342                drop(state);
343                self.shared.available.notify_all();
344                Ok(QueueOfferResult::Failure(error))
345            }
346            OverflowStrategy::Backpressure => {
347                if state.pending_count >= 1 {
348                    return Err(StreamError::Failed(
349                        "Too many concurrent offers. Specified maximum is 1. \
350                         You have to wait for the previous offer to resolve to send another request"
351                            .into(),
352                    ));
353                }
354                state.pending_count += 1;
355                loop {
356                    if state.terminal.is_some() || state.terminating {
357                        state.pending_count -= 1;
358                        return Ok(QueueOfferResult::QueueClosed);
359                    }
360                    if state.buffer.len() < capacity {
361                        state.pending_count -= 1;
362                        state.buffer.push_back(elem);
363                        drop(state);
364                        self.shared.available.notify_all();
365                        return Ok(QueueOfferResult::Enqueued);
366                    }
367                    state = self
368                        .shared
369                        .available
370                        .wait(state)
371                        .unwrap_or_else(|poison| poison.into_inner());
372                }
373            }
374        }
375    }
376
377    pub fn complete(&self) {
378        let mut state = self
379            .shared
380            .state
381            .lock()
382            .unwrap_or_else(|poison| poison.into_inner());
383        if state.buffer.is_empty() && state.pending_count == 0 {
384            if state.terminal.is_none() {
385                state.terminal = Some(TerminalSignal::Complete);
386            }
387        } else {
388            state.terminating = true;
389        }
390        drop(state);
391        self.shared.available.notify_all();
392    }
393
394    pub fn fail(&self, error: StreamError) {
395        let mut state = self
396            .shared
397            .state
398            .lock()
399            .unwrap_or_else(|poison| poison.into_inner());
400        if state.terminal.is_none() {
401            state.terminal = Some(TerminalSignal::Error(error));
402        }
403        drop(state);
404        self.shared.available.notify_all();
405    }
406}
407
408impl<T> Drop for SourceQueue<T> {
409    fn drop(&mut self) {
410        if Arc::strong_count(&self.shared) != 2 {
411            return;
412        }
413
414        let mut state = self
415            .shared
416            .state
417            .lock()
418            .unwrap_or_else(|poison| poison.into_inner());
419        if state.terminal.is_none() && !state.terminating {
420            state.terminal = Some(TerminalSignal::Complete);
421        }
422        drop(state);
423        self.shared.available.notify_all();
424    }
425}
426
427struct SourceQueueStream<T: Send + 'static> {
428    shared: Arc<SourceQueueShared<T>>,
429    completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
430}
431
432impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
433    type Item = StreamResult<T>;
434
435    fn next(&mut self) -> Option<Self::Item> {
436        let mut state = self
437            .shared
438            .state
439            .lock()
440            .unwrap_or_else(|poison| poison.into_inner());
441        loop {
442            if let Some(TerminalSignal::Error(error)) = &state.terminal {
443                let error = error.clone();
444                drop(state);
445                self.signal_completion(Err(error.clone()));
446                self.shared.available.notify_all();
447                return Some(Err(error));
448            }
449
450            if let Some(item) = state.buffer.pop_front() {
451                drop(state);
452                self.shared.available.notify_all();
453                return Some(Ok(item));
454            }
455
456            if let Some(terminal) = state.terminal.clone() {
457                if state.terminating {
458                    state.terminating = false;
459                }
460                drop(state);
461                self.signal_completion(match &terminal {
462                    TerminalSignal::Complete => Ok(NotUsed),
463                    TerminalSignal::Error(error) => Err(error.clone()),
464                });
465                self.shared.available.notify_all();
466                return match terminal {
467                    TerminalSignal::Complete => None,
468                    TerminalSignal::Error(error) => Some(Err(error)),
469                };
470            }
471
472            if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
473                state.terminal = Some(TerminalSignal::Complete);
474                state.terminating = false;
475                drop(state);
476                self.signal_completion(Ok(NotUsed));
477                self.shared.available.notify_all();
478                return None;
479            }
480
481            state = self
482                .shared
483                .available
484                .wait(state)
485                .unwrap_or_else(|poison| poison.into_inner());
486        }
487    }
488}
489
490impl<T: Send + 'static> SourceQueueStream<T> {
491    fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
492        if let Some(sender) = self.completion_sender.take() {
493            let _ = sender.send(result);
494        }
495    }
496}
497
498impl<T: Send + 'static> Drop for SourceQueueStream<T> {
499    fn drop(&mut self) {
500        let mut state = self
501            .shared
502            .state
503            .lock()
504            .unwrap_or_else(|poison| poison.into_inner());
505        if state.terminal.is_none() {
506            state.terminal = Some(TerminalSignal::Complete);
507        }
508        state.terminating = false;
509        drop(state);
510        self.signal_completion(Ok(NotUsed));
511        self.shared.available.notify_all();
512    }
513}
514
515// ── SinkQueue (blocking pull from sink) ────────────────────────────────────
516
517struct SinkQueueShared<T> {
518    state: Mutex<SinkQueueState<T>>,
519    available: Condvar,
520}
521
522struct SinkQueueState<T> {
523    buffer: VecDeque<T>,
524    error: Option<StreamError>,
525    completed: bool,
526}
527
528impl<T> SinkQueueShared<T> {
529    fn new() -> Arc<Self> {
530        Arc::new(Self {
531            state: Mutex::new(SinkQueueState {
532                buffer: VecDeque::new(),
533                error: None,
534                completed: false,
535            }),
536            available: Condvar::new(),
537        })
538    }
539}
540
541pub struct SinkQueue<T> {
542    shared: Arc<SinkQueueShared<T>>,
543    _completion: StreamCompletion<NotUsed>,
544}
545
546impl<T> SinkQueue<T> {
547    pub fn pull(&self) -> StreamResult<Option<T>> {
548        let mut state = self
549            .shared
550            .state
551            .lock()
552            .unwrap_or_else(|poison| poison.into_inner());
553        loop {
554            if let Some(item) = state.buffer.pop_front() {
555                return Ok(Some(item));
556            }
557            if let Some(error) = state.error.clone() {
558                return Err(error);
559            }
560            if state.completed {
561                return Ok(None);
562            }
563            state = self
564                .shared
565                .available
566                .wait(state)
567                .unwrap_or_else(|poison| poison.into_inner());
568        }
569    }
570}
571
572// ── Source factory methods ──────────────────────────────────────────────────
573
574impl<T: Send + 'static> Source<T, NotUsed> {
575    #[must_use]
576    pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
577        assert!(capacity > 0, "queue capacity must be greater than zero");
578        Source::from_materialized_factory(move |_materializer| {
579            let shared = BoundedQueueShared::new(capacity);
580            let stream: BoxStream<T> = Box::new(BoundedQueueStream {
581                shared: Arc::clone(&shared),
582            });
583            let handle = BoundedSourceQueue { shared };
584            Ok((stream, handle))
585        })
586    }
587
588    #[must_use]
589    pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
590        assert!(capacity > 0, "queue capacity must be greater than zero");
591        Source::from_materialized_factory(move |_materializer| {
592            let shared = SourceQueueShared::new(capacity, strategy);
593            let (completion_sender, completion_receiver) = oneshot::channel();
594            let stream: BoxStream<T> = Box::new(SourceQueueStream {
595                shared: Arc::clone(&shared),
596                completion_sender: Some(completion_sender),
597            });
598            let handle = SourceQueue {
599                shared,
600                completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
601            };
602            Ok((stream, handle))
603        })
604    }
605}
606
607// ── Sink factory methods ────────────────────────────────────────────────────
608
609impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
610    #[must_use]
611    pub fn queue() -> Self {
612        Sink::from_runner(move |mut input, materializer| {
613            let shared = SinkQueueShared::new();
614            let worker_shared = Arc::clone(&shared);
615
616            let completion = materializer.spawn_stream(move |stream_cancelled| {
617                loop {
618                    if stream_cancelled.load(Ordering::SeqCst) {
619                        return Ok(NotUsed);
620                    }
621                    match input.next() {
622                        Some(Ok(item)) => {
623                            let mut state = worker_shared
624                                .state
625                                .lock()
626                                .unwrap_or_else(|poison| poison.into_inner());
627                            state.buffer.push_back(item);
628                            drop(state);
629                            worker_shared.available.notify_all();
630                        }
631                        Some(Err(error)) => {
632                            let mut state = worker_shared
633                                .state
634                                .lock()
635                                .unwrap_or_else(|poison| poison.into_inner());
636                            if state.error.is_none() {
637                                state.error = Some(error);
638                            }
639                            drop(state);
640                            worker_shared.available.notify_all();
641                            return Ok(NotUsed);
642                        }
643                        None => {
644                            let mut state = worker_shared
645                                .state
646                                .lock()
647                                .unwrap_or_else(|poison| poison.into_inner());
648                            state.completed = true;
649                            drop(state);
650                            worker_shared.available.notify_all();
651                            return Ok(NotUsed);
652                        }
653                    }
654                }
655            });
656
657            Ok(SinkQueue {
658                shared: Arc::clone(&shared),
659                _completion: completion,
660            })
661        })
662    }
663}
664
665// ── Tests ───────────────────────────────────────────────────────────────────
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use crate::stream::Materializer;
671    use std::sync::atomic::AtomicBool;
672    use std::sync::atomic::Ordering;
673    use std::sync::mpsc;
674    use std::thread;
675    use std::time::{Duration, Instant};
676
677    // ── BoundedSourceQueue tests ────────────────────────────────────────
678
679    #[test]
680    fn bounded_offer_accepted_vs_processed_distinct() {
681        let (queue, mut stream) = materialize_bounded_queue(2);
682        // offer accepted means enqueued into buffer, not processed downstream
683        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
684        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
685        // proceed to process one element
686        assert_eq!(stream.next(), Some(Ok(1)));
687        // accepted is distinct from processed — element 2 still in buffer
688        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
689        assert_eq!(stream.next(), Some(Ok(2)));
690        assert_eq!(stream.next(), Some(Ok(3)));
691    }
692
693    #[test]
694    fn bounded_offer_dropped_when_full() {
695        let (queue, mut stream) = materialize_bounded_queue(1);
696        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
697        assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
698        assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
699        queue.complete();
700        assert_eq!(stream.next(), Some(Ok(1)));
701        assert_eq!(stream.next(), None);
702    }
703
704    #[test]
705    fn bounded_queue_closed_after_complete() {
706        let (queue, mut stream) = materialize_bounded_queue(2);
707        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
708        queue.complete();
709        // element still drainable
710        assert_eq!(stream.next(), Some(Ok(1)));
711        assert_eq!(stream.next(), None);
712        // offers after complete return QueueClosed
713        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
714    }
715
716    #[test]
717    fn bounded_queue_closed_after_fail() {
718        let (queue, mut stream) = materialize_bounded_queue(2);
719        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
720        queue.fail(StreamError::Failed("boom".into()));
721        // offers after fail return QueueClosed
722        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
723        // buffered elements drain before failure surfaces
724        assert_eq!(stream.next(), Some(Ok(1)));
725        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
726    }
727
728    #[test]
729    fn bounded_drop_handle_completes_stream() {
730        let queue = {
731            let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
732            queue.offer(1);
733            queue.offer(2);
734            assert_eq!(stream.next(), Some(Ok(1)));
735            queue
736        };
737        // stream dropped, handle dropped
738        assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
739    }
740
741    #[test]
742    fn bounded_drop_last_producer_completes_stream() {
743        let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
744        let producer = queue.clone();
745        let consumer = thread::spawn(move || {
746            assert_eq!(stream.next(), None);
747        });
748        drop(producer);
749        drop(queue);
750        consumer.join().unwrap();
751    }
752
753    #[test]
754    fn bounded_drain_before_complete() {
755        let (queue, mut stream) = materialize_bounded_queue(3);
756        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
757        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
758        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
759        assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
760        assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
761        queue.complete();
762        assert_eq!(stream.next(), Some(Ok(1)));
763        assert_eq!(stream.next(), Some(Ok(2)));
764        assert_eq!(stream.next(), Some(Ok(3)));
765        assert_eq!(stream.next(), None);
766    }
767
768    #[test]
769    fn bounded_terminal_completion_is_sticky() {
770        let (queue, mut stream) = materialize_bounded_queue(2);
771        queue.offer(1);
772        queue.complete();
773        assert_eq!(stream.next(), Some(Ok(1)));
774        assert_eq!(stream.next(), None);
775        assert_eq!(stream.next(), None);
776        assert_eq!(stream.next(), None);
777    }
778
779    #[test]
780    fn bounded_terminal_failure_is_sticky() {
781        let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
782        queue.fail(StreamError::Failed("boom".into()));
783        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
784        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
785    }
786
787    #[test]
788    fn bounded_producer_consumer_across_threads() {
789        let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
790        // start consumer first so it drains as producer fills
791        let consumer = thread::spawn(move || {
792            let mut collected = Vec::new();
793            while let Some(Ok(item)) = stream.next() {
794                collected.push(item);
795            }
796            collected
797        });
798        let producer = thread::spawn({
799            let queue = queue.clone();
800            move || {
801                for i in 0..10 {
802                    assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
803                }
804                queue.complete();
805            }
806        });
807        producer.join().unwrap();
808        let collected = consumer.join().unwrap();
809        assert_eq!(collected, (0..10).collect::<Vec<_>>());
810    }
811
812    #[test]
813    fn bounded_multi_producer_single_consumer() {
814        let n = 50_i32;
815        let producer_count = 4;
816        let total = (producer_count * n) as usize;
817        let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
818
819        // start consumer first
820        let consumer = thread::spawn(move || {
821            let mut collected = Vec::new();
822            while let Some(Ok(item)) = stream.next() {
823                collected.push(item);
824            }
825            collected
826        });
827
828        let mut handles = Vec::new();
829        for p in 0..producer_count {
830            let q = queue.clone();
831            handles.push(thread::spawn(move || {
832                for i in 0..n {
833                    assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
834                }
835            }));
836        }
837        for h in handles {
838            h.join().unwrap();
839        }
840        queue.complete();
841        let mut collected = consumer.join().unwrap();
842        collected.sort_unstable();
843        assert_eq!(collected.len(), total);
844        collected.sort_unstable();
845        assert_eq!(collected.len(), (producer_count * n) as usize);
846    }
847
848    #[test]
849    fn bounded_offer_poll_stress_no_lost_wakeup() {
850        // A small capacity makes the buffer oscillate between empty and full, so
851        // the consumer repeatedly drains it dry and parks while producers race
852        // to refill it. Every element that reports `Enqueued` must be delivered
853        // exactly once and the stream must terminate — a lost wakeup or a
854        // stranded element would hang the join or break the count.
855        const ROUNDS: usize = 40;
856        const PRODUCERS: i64 = 8;
857        const PER_PRODUCER: i64 = 500;
858
859        for _ in 0..ROUNDS {
860            let (queue, stream) = materialize_bounded_queue::<i64>(4);
861
862            let consumer = thread::spawn(move || {
863                let mut count = 0_u64;
864                for item in stream {
865                    assert!(item.is_ok(), "unexpected stream failure: {item:?}");
866                    count += 1;
867                }
868                count
869            });
870
871            let mut handles = Vec::new();
872            for p in 0..PRODUCERS {
873                let q = queue.clone();
874                handles.push(thread::spawn(move || {
875                    let mut enqueued = 0_u64;
876                    for i in 0..PER_PRODUCER {
877                        if q.offer(p * PER_PRODUCER + i) == QueueOfferResult::Enqueued {
878                            enqueued += 1;
879                        }
880                    }
881                    enqueued
882                }));
883            }
884
885            let mut total_enqueued = 0_u64;
886            for h in handles {
887                total_enqueued += h.join().unwrap();
888            }
889            // Every offer has now happened-before `complete`, so every accepted
890            // element is guaranteed to drain before the stream ends.
891            queue.complete();
892            let delivered = consumer.join().unwrap();
893            assert_eq!(
894                delivered, total_enqueued,
895                "every accepted element must be delivered exactly once"
896            );
897        }
898    }
899
900    #[test]
901    fn bounded_pingpong_in_order_stress() {
902        // Capacity 1 forces the producer to wait for the consumer to drain (and
903        // re-park) before nearly every element, exercising the park/wake
904        // handshake on the tightest possible loop. Elements must arrive exactly
905        // once, in order; a genuine deadlock trips the deadline rather than
906        // hanging the suite forever.
907        const ROUNDS: usize = 10;
908        const ITEMS: i64 = 1_000;
909
910        for _ in 0..ROUNDS {
911            let (queue, stream) = materialize_bounded_queue::<i64>(1);
912            let consumer = thread::spawn(move || {
913                let mut got = Vec::new();
914                for item in stream {
915                    got.push(item.expect("no spurious failure"));
916                }
917                got
918            });
919
920            for i in 0..ITEMS {
921                let deadline = Instant::now() + Duration::from_secs(5);
922                loop {
923                    match queue.offer(i) {
924                        QueueOfferResult::Enqueued => break,
925                        QueueOfferResult::Dropped => {
926                            assert!(
927                                Instant::now() < deadline,
928                                "offer never accepted within deadline — consumer stuck?"
929                            );
930                            thread::yield_now();
931                        }
932                        other => panic!("unexpected offer result: {other:?}"),
933                    }
934                }
935            }
936            queue.complete();
937            let got = consumer.join().unwrap();
938            assert_eq!(got, (0..ITEMS).collect::<Vec<_>>());
939        }
940    }
941
942    // ── SourceQueue tests ───────────────────────────────────────────────
943
944    fn materialize_source_queue<T: Send + 'static>(
945        capacity: usize,
946        strategy: OverflowStrategy,
947    ) -> (SourceQueue<T>, BoxStream<T>) {
948        let materializer = Materializer::new();
949        let (stream, queue) = Source::<T>::queue(capacity, strategy)
950            .factory
951            .create(&materializer)
952            .unwrap();
953        (queue, stream)
954    }
955
956    #[test]
957    fn source_queue_offer_enqueued() {
958        let (queue, mut stream) =
959            materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
960        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
961        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
962        assert_eq!(stream.next(), Some(Ok(1)));
963        assert_eq!(stream.next(), Some(Ok(2)));
964    }
965
966    #[test]
967    fn source_queue_drop_head() {
968        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
969        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
970        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
971        // buffer full, drop head
972        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
973        queue.complete();
974        assert_eq!(stream.next(), Some(Ok(2)));
975        assert_eq!(stream.next(), Some(Ok(3)));
976        assert_eq!(stream.next(), None);
977    }
978
979    #[test]
980    fn source_queue_drop_tail() {
981        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
982        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
983        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
984        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
985        queue.complete();
986        assert_eq!(stream.next(), Some(Ok(1)));
987        assert_eq!(stream.next(), Some(Ok(3)));
988        assert_eq!(stream.next(), None);
989    }
990
991    #[test]
992    fn source_queue_drop_buffer() {
993        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
994        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
995        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
996        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
997        queue.complete();
998        // DropBuffer clears the buffer, then enqueues the new element
999        assert_eq!(stream.next(), Some(Ok(3)));
1000        assert_eq!(stream.next(), None);
1001    }
1002
1003    #[test]
1004    fn source_queue_drop_new() {
1005        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
1006        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1007        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1008        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
1009        queue.complete();
1010        assert_eq!(stream.next(), Some(Ok(1)));
1011        assert_eq!(stream.next(), Some(Ok(2)));
1012        assert_eq!(stream.next(), None);
1013    }
1014
1015    #[test]
1016    fn source_queue_fail_strategy() {
1017        let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
1018        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1019        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1020        match queue.offer(3).unwrap() {
1021            QueueOfferResult::Failure(e) => {
1022                assert!(format!("{e:?}").contains("Buffer overflow"));
1023            }
1024            other => panic!("expected Failure, got {other:?}"),
1025        }
1026    }
1027
1028    #[test]
1029    fn source_queue_backpressure_blocks() {
1030        let (queue, mut stream) =
1031            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1032        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1033
1034        // Keep the stream alive until the blocked offer has a chance to observe
1035        // the newly freed slot.
1036        let consumed = Arc::new(AtomicBool::new(false));
1037        let c = Arc::clone(&consumed);
1038        let (release_tx, release_rx) = mpsc::channel();
1039        let consumer = thread::spawn(move || {
1040            assert_eq!(stream.next(), Some(Ok(1)));
1041            c.store(true, Ordering::SeqCst);
1042            release_rx.recv().unwrap();
1043        });
1044
1045        wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
1046        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1047        release_tx.send(()).unwrap();
1048        consumer.join().unwrap();
1049        assert!(consumed.load(Ordering::SeqCst));
1050    }
1051
1052    #[test]
1053    fn source_queue_concurrent_offer_violation() {
1054        let (queue, mut stream) =
1055            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1056        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1057
1058        let q = queue.clone();
1059        let started = Arc::new(AtomicBool::new(false));
1060        let s = Arc::clone(&started);
1061        let blocker = thread::spawn(move || {
1062            s.store(true, Ordering::SeqCst);
1063            // this blocks until drained
1064            let _ = q.offer(2);
1065        });
1066
1067        // wait for blocker to start
1068        while !started.load(Ordering::SeqCst) {
1069            thread::yield_now();
1070        }
1071        wait_until(Duration::from_secs(1), || {
1072            queue
1073                .shared
1074                .state
1075                .lock()
1076                .unwrap_or_else(|poison| poison.into_inner())
1077                .pending_count
1078                == 1
1079        });
1080
1081        // second concurrent offer should fail
1082        let result = queue.offer(3);
1083        assert!(result.is_err());
1084        assert!(format!("{result:?}").contains("Too many concurrent offers"));
1085
1086        // drain to unblock
1087        assert_eq!(stream.next(), Some(Ok(1)));
1088        blocker.join().unwrap();
1089    }
1090
1091    #[test]
1092    fn source_queue_watch_completion() {
1093        let (queue, mut stream) =
1094            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1095        queue.offer(1).unwrap();
1096        queue.complete();
1097        assert_eq!(stream.next(), Some(Ok(1)));
1098        assert_eq!(stream.next(), None);
1099        assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
1100    }
1101
1102    #[test]
1103    fn source_queue_watch_completion_on_failure() {
1104        let (queue, mut stream) =
1105            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1106        queue.fail(StreamError::Failed("boom".into()));
1107        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
1108        assert_eq!(
1109            queue.watch_completion().wait(),
1110            Err(StreamError::Failed("boom".into()))
1111        );
1112    }
1113
1114    #[test]
1115    fn source_queue_terminal_completion_is_sticky() {
1116        let (queue, mut stream) =
1117            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1118        queue.complete();
1119        assert_eq!(stream.next(), None);
1120        assert_eq!(stream.next(), None);
1121        assert_eq!(stream.next(), None);
1122    }
1123
1124    #[test]
1125    fn source_queue_offer_after_complete_returns_queue_closed() {
1126        let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1127        queue.complete();
1128        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
1129    }
1130
1131    #[test]
1132    fn source_queue_drop_stream_closes_queue() {
1133        let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1134        queue.offer(1).unwrap();
1135        drop(stream);
1136        // handle still alive, stream dropped → terminal set
1137        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
1138    }
1139
1140    #[test]
1141    fn source_queue_drop_last_producer_completes_stream() {
1142        let (queue, mut stream) =
1143            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1144        let producer = queue.clone();
1145        let consumer = thread::spawn(move || {
1146            assert_eq!(stream.next(), None);
1147        });
1148        drop(producer);
1149        drop(queue);
1150        consumer.join().unwrap();
1151    }
1152
1153    #[test]
1154    fn source_queue_backpressure_unblocks_on_complete() {
1155        let (queue, mut stream) =
1156            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1157        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1158
1159        let q = queue.clone();
1160        let blocker = thread::spawn(move || {
1161            // blocks waiting for space
1162            q.offer(2)
1163        });
1164
1165        wait_until(Duration::from_secs(1), || {
1166            queue
1167                .shared
1168                .state
1169                .lock()
1170                .unwrap_or_else(|poison| poison.into_inner())
1171                .pending_count
1172                == 1
1173        });
1174        // complete wakes the backpressured offerer
1175        queue.complete();
1176        let result = blocker.join().unwrap();
1177        assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1178
1179        // buffer still has item 1
1180        assert_eq!(stream.next(), Some(Ok(1)));
1181        assert_eq!(stream.next(), None);
1182    }
1183
1184    // ── SinkQueue tests ─────────────────────────────────────────────────
1185
1186    fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1187        source.run_with(Sink::queue()).unwrap()
1188    }
1189
1190    #[test]
1191    fn sink_queue_pull_elements() {
1192        let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1193        assert_eq!(queue.pull().unwrap(), Some(1));
1194        assert_eq!(queue.pull().unwrap(), Some(2));
1195        assert_eq!(queue.pull().unwrap(), Some(3));
1196        assert_eq!(queue.pull().unwrap(), None);
1197    }
1198
1199    #[test]
1200    fn sink_queue_pull_none_after_upstream_completion() {
1201        let queue = materialize_sink_queue(Source::from_iter([42]));
1202        assert_eq!(queue.pull().unwrap(), Some(42));
1203        assert_eq!(queue.pull().unwrap(), None);
1204        assert_eq!(queue.pull().unwrap(), None);
1205    }
1206
1207    #[test]
1208    fn sink_queue_pull_error_from_upstream_failure() {
1209        let queue =
1210            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1211        assert_eq!(
1212            queue.pull().unwrap_err(),
1213            StreamError::Failed("boom".into())
1214        );
1215        assert_eq!(
1216            queue.pull().unwrap_err(),
1217            StreamError::Failed("boom".into())
1218        );
1219    }
1220
1221    #[test]
1222    fn sink_queue_terminal_stickiness() {
1223        let queue = materialize_sink_queue(Source::<i32>::empty());
1224        assert_eq!(queue.pull().unwrap(), None);
1225        assert_eq!(queue.pull().unwrap(), None);
1226        assert_eq!(queue.pull().unwrap(), None);
1227    }
1228
1229    #[test]
1230    fn sink_queue_terminal_failure_stickiness() {
1231        let queue =
1232            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1233        assert_eq!(
1234            queue.pull().unwrap_err(),
1235            StreamError::Failed("boom".into())
1236        );
1237        assert_eq!(
1238            queue.pull().unwrap_err(),
1239            StreamError::Failed("boom".into())
1240        );
1241    }
1242
1243    #[test]
1244    fn sink_queue_drop_cancels_upstream() {
1245        let queue = materialize_sink_queue(Source::repeat(42));
1246        // dropping queue cancels worker
1247        drop(queue);
1248        // if this doesn't deadlock, the worker was cancelled
1249    }
1250
1251    #[test]
1252    fn sink_queue_drain_multiple_items() {
1253        let queue = materialize_sink_queue(Source::from_iter(0..100));
1254        for i in 0..100 {
1255            assert_eq!(queue.pull().unwrap(), Some(i));
1256        }
1257        assert_eq!(queue.pull().unwrap(), None);
1258    }
1259
1260    // ── Helpers ─────────────────────────────────────────────────────────
1261
1262    fn materialize_bounded_queue<T: Send + 'static>(
1263        capacity: usize,
1264    ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1265        let materializer = Materializer::new();
1266        let (stream, queue) = Source::<T>::queue_bounded(capacity)
1267            .factory
1268            .create(&materializer)
1269            .unwrap();
1270        (queue, stream)
1271    }
1272
1273    fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1274        let deadline = Instant::now() + timeout;
1275        while Instant::now() < deadline {
1276            if condition() {
1277                return;
1278            }
1279            thread::yield_now();
1280            thread::sleep(Duration::from_millis(1));
1281        }
1282        assert!(condition(), "condition was not met within {timeout:?}");
1283    }
1284}