Skip to main content

datum/
queue.rs

1//! Bounded and unbounded queues that bridge external producers into a Datum stream.
2//!
3//! [`SourceQueue`] / [`BoundedSourceQueue`] let external code feed elements into a running stream,
4//! while [`SinkQueue`] lets external code pull materialized results out on demand.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU8, Ordering, fence};
8use std::sync::{Arc, Condvar, Mutex, OnceLock};
9use std::time::Duration;
10
11use crossbeam_queue::ArrayQueue;
12
13use crate::stream::{BoxStream, NotUsed, OverflowStrategy, Sink, Source, StreamCompletion};
14use crate::{StreamError, StreamResult};
15use futures::channel::oneshot;
16
17/// Outcome of offering one element to a source queue.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum QueueOfferResult {
20    /// The element was accepted into the buffer.
21    Enqueued,
22    /// The element was dropped (buffer full under a drop overflow strategy).
23    Dropped,
24    /// The queue is completed/failed and no longer accepts elements.
25    QueueClosed,
26    /// The offer failed the stream (the `Fail` overflow strategy on overflow).
27    Failure(StreamError),
28}
29
30#[derive(Clone)]
31enum TerminalSignal {
32    Complete,
33    Error(StreamError),
34}
35
36// ── BoundedSourceQueue (synchronous, bounded, no overflow strategy) ─────────
37//
38// The element buffer is a lock-free bounded MPMC queue (`crossbeam_queue::
39// ArrayQueue`), so producers and the draining consumer never contend on a
40// shared mutex on the hot path and no per-element allocation occurs (the
41// backing array is allocated once). Terminal state (complete/fail) is a small
42// atomic checked with a single load per offer. The consumer parks on a condvar
43// only when it observes the buffer empty; producers wake it via a `parked` flag
44// guarded by a `SeqCst` fence that closes the classic store/load (Dekker) race,
45// and each park is additionally bounded by `PARK_BACKSTOP` so any
46// theoretically-missed wakeup self-heals rather than deadlocking.
47
48const TERM_NONE: u8 = 0;
49const TERM_COMPLETE: u8 = 1;
50const TERM_ERROR: u8 = 2;
51
52/// Upper bound on a single consumer park. Wakeups are normally delivered
53/// promptly by `wake_consumer`; this bound is a defensive backstop so that a
54/// lost wakeup (should one ever slip past the fence protocol) costs at most this
55/// much latency instead of deadlocking the stream.
56const PARK_BACKSTOP: Duration = Duration::from_millis(10);
57
58struct BoundedQueueShared<T> {
59    buffer: ArrayQueue<T>,
60    /// `TERM_NONE` / `TERM_COMPLETE` / `TERM_ERROR`; sticky once non-`NONE`.
61    terminal: AtomicU8,
62    /// Failure payload, published before `terminal` flips to `TERM_ERROR`.
63    error: OnceLock<StreamError>,
64    /// Guards only the condvar wait; touched solely on the slow park/wake path.
65    park: Mutex<()>,
66    available: Condvar,
67    /// True while the consumer is (about to be) blocked in `wait_timeout`.
68    parked: AtomicBool,
69}
70
71impl<T> BoundedQueueShared<T> {
72    fn new(capacity: usize) -> Arc<Self> {
73        Arc::new(Self {
74            buffer: ArrayQueue::new(capacity),
75            terminal: AtomicU8::new(TERM_NONE),
76            error: OnceLock::new(),
77            park: Mutex::new(()),
78            available: Condvar::new(),
79            parked: AtomicBool::new(false),
80        })
81    }
82
83    /// Mark the queue completed (sticky: the first terminal signal wins).
84    fn set_complete(&self) {
85        let _ = self.terminal.compare_exchange(
86            TERM_NONE,
87            TERM_COMPLETE,
88            Ordering::AcqRel,
89            Ordering::Relaxed,
90        );
91        self.wake_consumer();
92    }
93
94    /// Mark the queue failed (sticky: the first terminal signal wins).
95    fn set_error(&self, error: StreamError) {
96        // Publish the payload before flipping the state so a consumer that
97        // observes `TERM_ERROR` can always read it back.
98        let _ = self.error.set(error);
99        let _ = self.terminal.compare_exchange(
100            TERM_NONE,
101            TERM_ERROR,
102            Ordering::AcqRel,
103            Ordering::Relaxed,
104        );
105        self.wake_consumer();
106    }
107
108    /// Wake the consumer if it is (or is about to be) parked. In steady state —
109    /// the consumer actively draining — this is a single fenced atomic load and
110    /// touches no lock.
111    fn wake_consumer(&self) {
112        // Ordered after the producer's enqueue / terminal store. Pairs with the
113        // fence on the consumer's park path: the two `SeqCst` fences cannot be
114        // mutually reordered, so the producer's enqueue and the consumer's
115        // `parked` store cannot both be missed.
116        fence(Ordering::SeqCst);
117        if self.parked.load(Ordering::Relaxed) {
118            let _guard = self
119                .park
120                .lock()
121                .unwrap_or_else(|poison| poison.into_inner());
122            self.available.notify_one();
123        }
124    }
125}
126
127/// Materialized handle of [`Source::queue_bounded`]: a synchronous, bounded, lock-free queue with
128/// no overflow strategy (a full buffer drops). `Clone`; the stream completes when the last handle
129/// is dropped.
130#[derive(Clone)]
131pub struct BoundedSourceQueue<T> {
132    shared: Arc<BoundedQueueShared<T>>,
133}
134
135impl<T> BoundedSourceQueue<T> {
136    /// Offer one element. Returns immediately (never blocks): [`QueueOfferResult::Enqueued`] on
137    /// success, [`QueueOfferResult::Dropped`] if the buffer is full, or
138    /// [`QueueOfferResult::QueueClosed`] after completion/failure.
139    pub fn offer(&self, elem: T) -> QueueOfferResult {
140        if self.shared.terminal.load(Ordering::Acquire) != TERM_NONE {
141            return QueueOfferResult::QueueClosed;
142        }
143        match self.shared.buffer.push(elem) {
144            Ok(()) => {
145                self.shared.wake_consumer();
146                QueueOfferResult::Enqueued
147            }
148            Err(_full) => QueueOfferResult::Dropped,
149        }
150    }
151
152    /// Complete the stream after buffered elements drain (idempotent; first terminal signal wins).
153    pub fn complete(&self) {
154        self.shared.set_complete();
155    }
156
157    /// Fail the stream; buffered elements drain before the error surfaces downstream.
158    pub fn fail(&self, error: StreamError) {
159        self.shared.set_error(error);
160    }
161}
162
163impl<T> Drop for BoundedSourceQueue<T> {
164    fn drop(&mut self) {
165        // Only the last live producer handle (handle + stream == 2) completes
166        // the stream; earlier clones leave it running.
167        if Arc::strong_count(&self.shared) != 2 {
168            return;
169        }
170        self.shared.set_complete();
171    }
172}
173
174struct BoundedQueueStream<T> {
175    shared: Arc<BoundedQueueShared<T>>,
176}
177
178impl<T> Iterator for BoundedQueueStream<T> {
179    type Item = StreamResult<T>;
180
181    fn next(&mut self) -> Option<Self::Item> {
182        let shared = &*self.shared;
183        loop {
184            // Drain buffered elements before honoring any terminal signal.
185            if let Some(item) = shared.buffer.pop() {
186                return Some(Ok(item));
187            }
188            match shared.terminal.load(Ordering::Acquire) {
189                TERM_COMPLETE => {
190                    // Synchronizing with `complete()` can make a producer's
191                    // prior push visible only after the first empty pop above.
192                    // Re-drain once before honoring the terminal signal so a
193                    // last accepted element is not lost at the boundary.
194                    if let Some(item) = shared.buffer.pop() {
195                        return Some(Ok(item));
196                    }
197                    return None;
198                }
199                TERM_ERROR => {
200                    if let Some(item) = shared.buffer.pop() {
201                        return Some(Ok(item));
202                    }
203                    let error = shared
204                        .error
205                        .get()
206                        .cloned()
207                        .unwrap_or_else(|| StreamError::Failed("queue failed".into()));
208                    return Some(Err(error));
209                }
210                _ => {}
211            }
212
213            // Empty and live: park until a producer enqueues or signals.
214            let guard = shared
215                .park
216                .lock()
217                .unwrap_or_else(|poison| poison.into_inner());
218            shared.parked.store(true, Ordering::Relaxed);
219            // Re-check after publishing `parked`. The fence pairs with the one
220            // in `wake_consumer`, so an enqueue that landed just before we set
221            // `parked` is observed here rather than slept through.
222            fence(Ordering::SeqCst);
223            if !shared.buffer.is_empty() || shared.terminal.load(Ordering::Acquire) != TERM_NONE {
224                shared.parked.store(false, Ordering::Relaxed);
225                drop(guard);
226                continue;
227            }
228            let (guard, _timeout) = shared
229                .available
230                .wait_timeout(guard, PARK_BACKSTOP)
231                .unwrap_or_else(|poison| poison.into_inner());
232            shared.parked.store(false, Ordering::Relaxed);
233            drop(guard);
234        }
235    }
236}
237
238impl<T> Drop for BoundedQueueStream<T> {
239    fn drop(&mut self) {
240        // Stream gone: close the queue so later offers see `QueueClosed`.
241        self.shared.set_complete();
242    }
243}
244
245// ── SourceQueue (with overflow strategy, maxConcurrentOffers = 1) ──────────
246
247struct SourceQueueShared<T> {
248    state: Mutex<SourceQueueState<T>>,
249    available: Condvar,
250    capacity: usize,
251    strategy: OverflowStrategy,
252}
253
254struct SourceQueueState<T> {
255    buffer: VecDeque<T>,
256    terminal: Option<TerminalSignal>,
257    terminating: bool,
258    pending_count: usize,
259}
260
261impl<T> SourceQueueShared<T> {
262    fn new(capacity: usize, strategy: OverflowStrategy) -> Arc<Self> {
263        Arc::new(Self {
264            state: Mutex::new(SourceQueueState {
265                buffer: VecDeque::with_capacity(capacity),
266                terminal: None,
267                terminating: false,
268                pending_count: 0,
269            }),
270            available: Condvar::new(),
271            capacity,
272            strategy,
273        })
274    }
275}
276
277/// Materialized handle of [`Source::queue`]: a bounded queue with a configurable
278/// [`OverflowStrategy`]. Under `Backpressure`, `maxConcurrentOffers` is 1 — `offer` blocks until
279/// buffer space is free and a second concurrent offer errors.
280pub struct SourceQueue<T> {
281    shared: Arc<SourceQueueShared<T>>,
282    completion: Option<StreamCompletion<NotUsed>>,
283}
284
285impl<T> Clone for SourceQueue<T> {
286    fn clone(&self) -> Self {
287        Self {
288            shared: Arc::clone(&self.shared),
289            completion: Some(StreamCompletion::ready(Err(StreamError::Failed(
290                "cannot clone queue completion handle; use watch_completion() on the original handle"
291                    .into(),
292            )))),
293        }
294    }
295}
296
297impl<T: Send + 'static> SourceQueue<T> {
298    /// Take the one-shot completion handle for this queue (resolves when the stream ends). Only the
299    /// original handle holds it; a cloned `SourceQueue` returns an error completion.
300    pub fn watch_completion(mut self) -> StreamCompletion<NotUsed> {
301        self.completion.take().unwrap_or_else(|| {
302            StreamCompletion::ready(Err(StreamError::Failed(
303                "queue completion handle already taken".into(),
304            )))
305        })
306    }
307
308    /// Offer one element, applying the configured [`OverflowStrategy`]. Returns `Err` only when a
309    /// second concurrent offer races a pending `Backpressure` offer; otherwise the outcome is in
310    /// the [`QueueOfferResult`].
311    pub fn offer(&self, elem: T) -> StreamResult<QueueOfferResult> {
312        let strategy = self.shared.strategy;
313        let capacity = self.shared.capacity;
314        let mut state = self
315            .shared
316            .state
317            .lock()
318            .unwrap_or_else(|poison| poison.into_inner());
319
320        if state.terminal.is_some() {
321            return Ok(QueueOfferResult::QueueClosed);
322        }
323
324        if state.terminating {
325            return Ok(QueueOfferResult::QueueClosed);
326        }
327
328        if state.buffer.len() < capacity {
329            state.buffer.push_back(elem);
330            drop(state);
331            self.shared.available.notify_all();
332            return Ok(QueueOfferResult::Enqueued);
333        }
334
335        match strategy {
336            OverflowStrategy::DropHead => {
337                let _ = state.buffer.pop_front();
338                state.buffer.push_back(elem);
339                drop(state);
340                self.shared.available.notify_all();
341                Ok(QueueOfferResult::Enqueued)
342            }
343            OverflowStrategy::DropTail => {
344                let _ = state.buffer.pop_back();
345                state.buffer.push_back(elem);
346                drop(state);
347                self.shared.available.notify_all();
348                Ok(QueueOfferResult::Enqueued)
349            }
350            OverflowStrategy::DropBuffer => {
351                state.buffer.clear();
352                state.buffer.push_back(elem);
353                drop(state);
354                self.shared.available.notify_all();
355                Ok(QueueOfferResult::Enqueued)
356            }
357            OverflowStrategy::DropNew => Ok(QueueOfferResult::Dropped),
358            OverflowStrategy::Fail => {
359                state.buffer.clear();
360                let error =
361                    StreamError::Failed(format!("Buffer overflow (max capacity was: {capacity})!"));
362                state.terminal = Some(TerminalSignal::Error(error.clone()));
363                drop(state);
364                self.shared.available.notify_all();
365                Ok(QueueOfferResult::Failure(error))
366            }
367            OverflowStrategy::Backpressure => {
368                if state.pending_count >= 1 {
369                    return Err(StreamError::Failed(
370                        "Too many concurrent offers. Specified maximum is 1. \
371                         You have to wait for the previous offer to resolve to send another request"
372                            .into(),
373                    ));
374                }
375                state.pending_count += 1;
376                loop {
377                    if state.terminal.is_some() || state.terminating {
378                        state.pending_count -= 1;
379                        return Ok(QueueOfferResult::QueueClosed);
380                    }
381                    if state.buffer.len() < capacity {
382                        state.pending_count -= 1;
383                        state.buffer.push_back(elem);
384                        drop(state);
385                        self.shared.available.notify_all();
386                        return Ok(QueueOfferResult::Enqueued);
387                    }
388                    state = self
389                        .shared
390                        .available
391                        .wait(state)
392                        .unwrap_or_else(|poison| poison.into_inner());
393                }
394            }
395        }
396    }
397
398    /// Complete the stream once buffered/pending elements drain (idempotent).
399    pub fn complete(&self) {
400        let mut state = self
401            .shared
402            .state
403            .lock()
404            .unwrap_or_else(|poison| poison.into_inner());
405        if state.buffer.is_empty() && state.pending_count == 0 {
406            if state.terminal.is_none() {
407                state.terminal = Some(TerminalSignal::Complete);
408            }
409        } else {
410            state.terminating = true;
411        }
412        drop(state);
413        self.shared.available.notify_all();
414    }
415
416    /// Fail the stream with `error` (first terminal signal wins).
417    pub fn fail(&self, error: StreamError) {
418        let mut state = self
419            .shared
420            .state
421            .lock()
422            .unwrap_or_else(|poison| poison.into_inner());
423        if state.terminal.is_none() {
424            state.terminal = Some(TerminalSignal::Error(error));
425        }
426        drop(state);
427        self.shared.available.notify_all();
428    }
429}
430
431impl<T> Drop for SourceQueue<T> {
432    fn drop(&mut self) {
433        if Arc::strong_count(&self.shared) != 2 {
434            return;
435        }
436
437        let mut state = self
438            .shared
439            .state
440            .lock()
441            .unwrap_or_else(|poison| poison.into_inner());
442        if state.terminal.is_none() && !state.terminating {
443            state.terminal = Some(TerminalSignal::Complete);
444        }
445        drop(state);
446        self.shared.available.notify_all();
447    }
448}
449
450struct SourceQueueStream<T: Send + 'static> {
451    shared: Arc<SourceQueueShared<T>>,
452    completion_sender: Option<oneshot::Sender<StreamResult<NotUsed>>>,
453}
454
455impl<T: Send + 'static> Iterator for SourceQueueStream<T> {
456    type Item = StreamResult<T>;
457
458    fn next(&mut self) -> Option<Self::Item> {
459        let mut state = self
460            .shared
461            .state
462            .lock()
463            .unwrap_or_else(|poison| poison.into_inner());
464        loop {
465            if let Some(TerminalSignal::Error(error)) = &state.terminal {
466                let error = error.clone();
467                drop(state);
468                self.signal_completion(Err(error.clone()));
469                self.shared.available.notify_all();
470                return Some(Err(error));
471            }
472
473            if let Some(item) = state.buffer.pop_front() {
474                drop(state);
475                self.shared.available.notify_all();
476                return Some(Ok(item));
477            }
478
479            if let Some(terminal) = state.terminal.clone() {
480                if state.terminating {
481                    state.terminating = false;
482                }
483                drop(state);
484                self.signal_completion(match &terminal {
485                    TerminalSignal::Complete => Ok(NotUsed),
486                    TerminalSignal::Error(error) => Err(error.clone()),
487                });
488                self.shared.available.notify_all();
489                return match terminal {
490                    TerminalSignal::Complete => None,
491                    TerminalSignal::Error(error) => Some(Err(error)),
492                };
493            }
494
495            if state.terminating && state.buffer.is_empty() && state.pending_count == 0 {
496                state.terminal = Some(TerminalSignal::Complete);
497                state.terminating = false;
498                drop(state);
499                self.signal_completion(Ok(NotUsed));
500                self.shared.available.notify_all();
501                return None;
502            }
503
504            state = self
505                .shared
506                .available
507                .wait(state)
508                .unwrap_or_else(|poison| poison.into_inner());
509        }
510    }
511}
512
513impl<T: Send + 'static> SourceQueueStream<T> {
514    fn signal_completion(&mut self, result: StreamResult<NotUsed>) {
515        if let Some(sender) = self.completion_sender.take() {
516            let _ = sender.send(result);
517        }
518    }
519}
520
521impl<T: Send + 'static> Drop for SourceQueueStream<T> {
522    fn drop(&mut self) {
523        let mut state = self
524            .shared
525            .state
526            .lock()
527            .unwrap_or_else(|poison| poison.into_inner());
528        if state.terminal.is_none() {
529            state.terminal = Some(TerminalSignal::Complete);
530        }
531        state.terminating = false;
532        drop(state);
533        self.signal_completion(Ok(NotUsed));
534        self.shared.available.notify_all();
535    }
536}
537
538// ── SinkQueue (blocking pull from sink) ────────────────────────────────────
539
540struct SinkQueueShared<T> {
541    state: Mutex<SinkQueueState<T>>,
542    available: Condvar,
543}
544
545struct SinkQueueState<T> {
546    buffer: VecDeque<T>,
547    error: Option<StreamError>,
548    completed: bool,
549}
550
551impl<T> SinkQueueShared<T> {
552    fn new() -> Arc<Self> {
553        Arc::new(Self {
554            state: Mutex::new(SinkQueueState {
555                buffer: VecDeque::new(),
556                error: None,
557                completed: false,
558            }),
559            available: Condvar::new(),
560        })
561    }
562}
563
564/// Materialized handle of [`Sink::queue`]: lets external code pull materialized elements out of a
565/// running stream on demand.
566pub struct SinkQueue<T> {
567    shared: Arc<SinkQueueShared<T>>,
568    _completion: StreamCompletion<NotUsed>,
569}
570
571impl<T> SinkQueue<T> {
572    /// Block until the next element is available: `Ok(Some(elem))` for an element, `Ok(None)` at
573    /// completion, or `Err` if the upstream failed.
574    pub fn pull(&self) -> StreamResult<Option<T>> {
575        let mut state = self
576            .shared
577            .state
578            .lock()
579            .unwrap_or_else(|poison| poison.into_inner());
580        loop {
581            if let Some(item) = state.buffer.pop_front() {
582                return Ok(Some(item));
583            }
584            if let Some(error) = state.error.clone() {
585                return Err(error);
586            }
587            if state.completed {
588                return Ok(None);
589            }
590            state = self
591                .shared
592                .available
593                .wait(state)
594                .unwrap_or_else(|poison| poison.into_inner());
595        }
596    }
597}
598
599// ── Source factory methods ──────────────────────────────────────────────────
600
601impl<T: Send + 'static> Source<T, NotUsed> {
602    /// A source fed by an external [`BoundedSourceQueue`] handle (lock-free, no overflow strategy;
603    /// a full buffer drops). Panics if `capacity == 0`.
604    #[must_use]
605    pub fn queue_bounded(capacity: usize) -> Source<T, BoundedSourceQueue<T>> {
606        assert!(capacity > 0, "queue capacity must be greater than zero");
607        Source::from_materialized_factory(move |_materializer| {
608            let shared = BoundedQueueShared::new(capacity);
609            let stream: BoxStream<T> = Box::new(BoundedQueueStream {
610                shared: Arc::clone(&shared),
611            });
612            let handle = BoundedSourceQueue { shared };
613            Ok((stream, handle))
614        })
615    }
616
617    /// A source fed by an external [`SourceQueue`] handle that applies `strategy` on overflow.
618    /// Panics if `capacity == 0`.
619    #[must_use]
620    pub fn queue(capacity: usize, strategy: OverflowStrategy) -> Source<T, SourceQueue<T>> {
621        assert!(capacity > 0, "queue capacity must be greater than zero");
622        Source::from_materialized_factory(move |_materializer| {
623            let shared = SourceQueueShared::new(capacity, strategy);
624            let (completion_sender, completion_receiver) = oneshot::channel();
625            let stream: BoxStream<T> = Box::new(SourceQueueStream {
626                shared: Arc::clone(&shared),
627                completion_sender: Some(completion_sender),
628            });
629            let handle = SourceQueue {
630                shared,
631                completion: Some(StreamCompletion::from_receiver(completion_receiver, None)),
632            };
633            Ok((stream, handle))
634        })
635    }
636}
637
638// ── Sink factory methods ────────────────────────────────────────────────────
639
640impl<T: Send + 'static> Sink<T, SinkQueue<T>> {
641    /// A sink whose materialized [`SinkQueue`] handle lets external code `pull` elements on demand.
642    #[must_use]
643    pub fn queue() -> Self {
644        Sink::from_runner(move |mut input, materializer| {
645            let shared = SinkQueueShared::new();
646            let worker_shared = Arc::clone(&shared);
647
648            let completion = materializer.spawn_stream(move |stream_cancelled| {
649                loop {
650                    if stream_cancelled.load(Ordering::SeqCst) {
651                        return Ok(NotUsed);
652                    }
653                    match input.next() {
654                        Some(Ok(item)) => {
655                            let mut state = worker_shared
656                                .state
657                                .lock()
658                                .unwrap_or_else(|poison| poison.into_inner());
659                            state.buffer.push_back(item);
660                            drop(state);
661                            worker_shared.available.notify_all();
662                        }
663                        Some(Err(error)) => {
664                            let mut state = worker_shared
665                                .state
666                                .lock()
667                                .unwrap_or_else(|poison| poison.into_inner());
668                            if state.error.is_none() {
669                                state.error = Some(error);
670                            }
671                            drop(state);
672                            worker_shared.available.notify_all();
673                            return Ok(NotUsed);
674                        }
675                        None => {
676                            let mut state = worker_shared
677                                .state
678                                .lock()
679                                .unwrap_or_else(|poison| poison.into_inner());
680                            state.completed = true;
681                            drop(state);
682                            worker_shared.available.notify_all();
683                            return Ok(NotUsed);
684                        }
685                    }
686                }
687            });
688
689            Ok(SinkQueue {
690                shared: Arc::clone(&shared),
691                _completion: completion,
692            })
693        })
694    }
695}
696
697// ── Tests ───────────────────────────────────────────────────────────────────
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use crate::stream::Materializer;
703    use std::sync::atomic::AtomicBool;
704    use std::sync::atomic::Ordering;
705    use std::sync::mpsc;
706    use std::thread;
707    use std::time::{Duration, Instant};
708
709    // ── BoundedSourceQueue tests ────────────────────────────────────────
710
711    #[test]
712    fn bounded_offer_accepted_vs_processed_distinct() {
713        let (queue, mut stream) = materialize_bounded_queue(2);
714        // offer accepted means enqueued into buffer, not processed downstream
715        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
716        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
717        // proceed to process one element
718        assert_eq!(stream.next(), Some(Ok(1)));
719        // accepted is distinct from processed — element 2 still in buffer
720        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
721        assert_eq!(stream.next(), Some(Ok(2)));
722        assert_eq!(stream.next(), Some(Ok(3)));
723    }
724
725    #[test]
726    fn bounded_offer_dropped_when_full() {
727        let (queue, mut stream) = materialize_bounded_queue(1);
728        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
729        assert_eq!(queue.offer(2), QueueOfferResult::Dropped);
730        assert_eq!(queue.offer(3), QueueOfferResult::Dropped);
731        queue.complete();
732        assert_eq!(stream.next(), Some(Ok(1)));
733        assert_eq!(stream.next(), None);
734    }
735
736    #[test]
737    fn bounded_queue_closed_after_complete() {
738        let (queue, mut stream) = materialize_bounded_queue(2);
739        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
740        queue.complete();
741        // element still drainable
742        assert_eq!(stream.next(), Some(Ok(1)));
743        assert_eq!(stream.next(), None);
744        // offers after complete return QueueClosed
745        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
746    }
747
748    #[test]
749    fn bounded_queue_closed_after_fail() {
750        let (queue, mut stream) = materialize_bounded_queue(2);
751        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
752        queue.fail(StreamError::Failed("boom".into()));
753        // offers after fail return QueueClosed
754        assert_eq!(queue.offer(2), QueueOfferResult::QueueClosed);
755        // buffered elements drain before failure surfaces
756        assert_eq!(stream.next(), Some(Ok(1)));
757        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
758    }
759
760    #[test]
761    fn bounded_drop_handle_completes_stream() {
762        let queue = {
763            let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
764            queue.offer(1);
765            queue.offer(2);
766            assert_eq!(stream.next(), Some(Ok(1)));
767            queue
768        };
769        // stream dropped, handle dropped
770        assert_eq!(queue.offer(3), QueueOfferResult::QueueClosed);
771    }
772
773    #[test]
774    fn bounded_drop_last_producer_completes_stream() {
775        let (queue, mut stream) = materialize_bounded_queue::<i32>(1);
776        let producer = queue.clone();
777        let consumer = thread::spawn(move || {
778            assert_eq!(stream.next(), None);
779        });
780        drop(producer);
781        drop(queue);
782        consumer.join().unwrap();
783    }
784
785    #[test]
786    fn bounded_drain_before_complete() {
787        let (queue, mut stream) = materialize_bounded_queue(3);
788        assert_eq!(queue.offer(1), QueueOfferResult::Enqueued);
789        assert_eq!(queue.offer(2), QueueOfferResult::Enqueued);
790        assert_eq!(queue.offer(3), QueueOfferResult::Enqueued);
791        assert_eq!(queue.offer(4), QueueOfferResult::Dropped);
792        assert_eq!(queue.offer(5), QueueOfferResult::Dropped);
793        queue.complete();
794        assert_eq!(stream.next(), Some(Ok(1)));
795        assert_eq!(stream.next(), Some(Ok(2)));
796        assert_eq!(stream.next(), Some(Ok(3)));
797        assert_eq!(stream.next(), None);
798    }
799
800    #[test]
801    fn bounded_terminal_completion_is_sticky() {
802        let (queue, mut stream) = materialize_bounded_queue(2);
803        queue.offer(1);
804        queue.complete();
805        assert_eq!(stream.next(), Some(Ok(1)));
806        assert_eq!(stream.next(), None);
807        assert_eq!(stream.next(), None);
808        assert_eq!(stream.next(), None);
809    }
810
811    #[test]
812    fn bounded_terminal_failure_is_sticky() {
813        let (queue, mut stream) = materialize_bounded_queue::<i32>(2);
814        queue.fail(StreamError::Failed("boom".into()));
815        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
816        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
817    }
818
819    #[test]
820    fn bounded_producer_consumer_across_threads() {
821        let (queue, mut stream) = materialize_bounded_queue::<i32>(16);
822        // start consumer first so it drains as producer fills
823        let consumer = thread::spawn(move || {
824            let mut collected = Vec::new();
825            while let Some(Ok(item)) = stream.next() {
826                collected.push(item);
827            }
828            collected
829        });
830        let producer = thread::spawn({
831            let queue = queue.clone();
832            move || {
833                for i in 0..10 {
834                    assert_eq!(queue.offer(i), QueueOfferResult::Enqueued);
835                }
836                queue.complete();
837            }
838        });
839        producer.join().unwrap();
840        let collected = consumer.join().unwrap();
841        assert_eq!(collected, (0..10).collect::<Vec<_>>());
842    }
843
844    #[test]
845    fn bounded_multi_producer_single_consumer() {
846        let n = 50_i32;
847        let producer_count = 4;
848        let total = (producer_count * n) as usize;
849        let (queue, mut stream) = materialize_bounded_queue::<i32>(total);
850
851        // start consumer first
852        let consumer = thread::spawn(move || {
853            let mut collected = Vec::new();
854            while let Some(Ok(item)) = stream.next() {
855                collected.push(item);
856            }
857            collected
858        });
859
860        let mut handles = Vec::new();
861        for p in 0..producer_count {
862            let q = queue.clone();
863            handles.push(thread::spawn(move || {
864                for i in 0..n {
865                    assert_eq!(q.offer(p * n + i), QueueOfferResult::Enqueued);
866                }
867            }));
868        }
869        for h in handles {
870            h.join().unwrap();
871        }
872        queue.complete();
873        let mut collected = consumer.join().unwrap();
874        collected.sort_unstable();
875        assert_eq!(collected.len(), total);
876        collected.sort_unstable();
877        assert_eq!(collected.len(), (producer_count * n) as usize);
878    }
879
880    #[test]
881    fn bounded_offer_poll_stress_no_lost_wakeup() {
882        // A small capacity makes the buffer oscillate between empty and full, so
883        // the consumer repeatedly drains it dry and parks while producers race
884        // to refill it. Every element that reports `Enqueued` must be delivered
885        // exactly once and the stream must terminate — a lost wakeup or a
886        // stranded element would hang the join or break the count.
887        const ROUNDS: usize = 40;
888        const PRODUCERS: i64 = 8;
889        const PER_PRODUCER: i64 = 500;
890
891        for _ in 0..ROUNDS {
892            let (queue, stream) = materialize_bounded_queue::<i64>(4);
893
894            let consumer = thread::spawn(move || {
895                let mut count = 0_u64;
896                for item in stream {
897                    assert!(item.is_ok(), "unexpected stream failure: {item:?}");
898                    count += 1;
899                }
900                count
901            });
902
903            let mut handles = Vec::new();
904            for p in 0..PRODUCERS {
905                let q = queue.clone();
906                handles.push(thread::spawn(move || {
907                    let mut enqueued = 0_u64;
908                    for i in 0..PER_PRODUCER {
909                        if q.offer(p * PER_PRODUCER + i) == QueueOfferResult::Enqueued {
910                            enqueued += 1;
911                        }
912                    }
913                    enqueued
914                }));
915            }
916
917            let mut total_enqueued = 0_u64;
918            for h in handles {
919                total_enqueued += h.join().unwrap();
920            }
921            // Every offer has now happened-before `complete`, so every accepted
922            // element is guaranteed to drain before the stream ends.
923            queue.complete();
924            let delivered = consumer.join().unwrap();
925            assert_eq!(
926                delivered, total_enqueued,
927                "every accepted element must be delivered exactly once"
928            );
929        }
930    }
931
932    #[test]
933    fn bounded_pingpong_in_order_stress() {
934        // Capacity 1 forces the producer to wait for the consumer to drain (and
935        // re-park) before nearly every element, exercising the park/wake
936        // handshake on the tightest possible loop. Elements must arrive exactly
937        // once, in order; a genuine deadlock trips the deadline rather than
938        // hanging the suite forever.
939        const ROUNDS: usize = 10;
940        const ITEMS: i64 = 1_000;
941
942        for _ in 0..ROUNDS {
943            let (queue, stream) = materialize_bounded_queue::<i64>(1);
944            let consumer = thread::spawn(move || {
945                let mut got = Vec::new();
946                for item in stream {
947                    got.push(item.expect("no spurious failure"));
948                }
949                got
950            });
951
952            for i in 0..ITEMS {
953                let deadline = Instant::now() + Duration::from_secs(5);
954                loop {
955                    match queue.offer(i) {
956                        QueueOfferResult::Enqueued => break,
957                        QueueOfferResult::Dropped => {
958                            assert!(
959                                Instant::now() < deadline,
960                                "offer never accepted within deadline — consumer stuck?"
961                            );
962                            thread::yield_now();
963                        }
964                        other => panic!("unexpected offer result: {other:?}"),
965                    }
966                }
967            }
968            queue.complete();
969            let got = consumer.join().unwrap();
970            assert_eq!(got, (0..ITEMS).collect::<Vec<_>>());
971        }
972    }
973
974    // ── SourceQueue tests ───────────────────────────────────────────────
975
976    fn materialize_source_queue<T: Send + 'static>(
977        capacity: usize,
978        strategy: OverflowStrategy,
979    ) -> (SourceQueue<T>, BoxStream<T>) {
980        let materializer = Materializer::new();
981        let (stream, queue) = Source::<T>::queue(capacity, strategy)
982            .factory
983            .create(&materializer)
984            .unwrap();
985        (queue, stream)
986    }
987
988    #[test]
989    fn source_queue_offer_enqueued() {
990        let (queue, mut stream) =
991            materialize_source_queue::<i32>(2, OverflowStrategy::Backpressure);
992        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
993        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
994        assert_eq!(stream.next(), Some(Ok(1)));
995        assert_eq!(stream.next(), Some(Ok(2)));
996    }
997
998    #[test]
999    fn source_queue_drop_head() {
1000        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropHead);
1001        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1002        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1003        // buffer full, drop head
1004        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1005        queue.complete();
1006        assert_eq!(stream.next(), Some(Ok(2)));
1007        assert_eq!(stream.next(), Some(Ok(3)));
1008        assert_eq!(stream.next(), None);
1009    }
1010
1011    #[test]
1012    fn source_queue_drop_tail() {
1013        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropTail);
1014        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1015        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1016        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1017        queue.complete();
1018        assert_eq!(stream.next(), Some(Ok(1)));
1019        assert_eq!(stream.next(), Some(Ok(3)));
1020        assert_eq!(stream.next(), None);
1021    }
1022
1023    #[test]
1024    fn source_queue_drop_buffer() {
1025        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropBuffer);
1026        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1027        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1028        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Enqueued);
1029        queue.complete();
1030        // DropBuffer clears the buffer, then enqueues the new element
1031        assert_eq!(stream.next(), Some(Ok(3)));
1032        assert_eq!(stream.next(), None);
1033    }
1034
1035    #[test]
1036    fn source_queue_drop_new() {
1037        let (queue, mut stream) = materialize_source_queue::<i32>(2, OverflowStrategy::DropNew);
1038        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1039        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1040        assert_eq!(queue.offer(3).unwrap(), QueueOfferResult::Dropped);
1041        queue.complete();
1042        assert_eq!(stream.next(), Some(Ok(1)));
1043        assert_eq!(stream.next(), Some(Ok(2)));
1044        assert_eq!(stream.next(), None);
1045    }
1046
1047    #[test]
1048    fn source_queue_fail_strategy() {
1049        let (queue, _stream) = materialize_source_queue::<i32>(2, OverflowStrategy::Fail);
1050        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1051        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1052        match queue.offer(3).unwrap() {
1053            QueueOfferResult::Failure(e) => {
1054                assert!(format!("{e:?}").contains("Buffer overflow"));
1055            }
1056            other => panic!("expected Failure, got {other:?}"),
1057        }
1058    }
1059
1060    #[test]
1061    fn source_queue_backpressure_blocks() {
1062        let (queue, mut stream) =
1063            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1064        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1065
1066        // Keep the stream alive until the blocked offer has a chance to observe
1067        // the newly freed slot.
1068        let consumed = Arc::new(AtomicBool::new(false));
1069        let c = Arc::clone(&consumed);
1070        let (release_tx, release_rx) = mpsc::channel();
1071        let consumer = thread::spawn(move || {
1072            assert_eq!(stream.next(), Some(Ok(1)));
1073            c.store(true, Ordering::SeqCst);
1074            release_rx.recv().unwrap();
1075        });
1076
1077        wait_until(Duration::from_secs(1), || consumed.load(Ordering::SeqCst));
1078        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::Enqueued);
1079        release_tx.send(()).unwrap();
1080        consumer.join().unwrap();
1081        assert!(consumed.load(Ordering::SeqCst));
1082    }
1083
1084    #[test]
1085    fn source_queue_concurrent_offer_violation() {
1086        let (queue, mut stream) =
1087            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1088        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1089
1090        let q = queue.clone();
1091        let started = Arc::new(AtomicBool::new(false));
1092        let s = Arc::clone(&started);
1093        let blocker = thread::spawn(move || {
1094            s.store(true, Ordering::SeqCst);
1095            // this blocks until drained
1096            let _ = q.offer(2);
1097        });
1098
1099        // wait for blocker to start
1100        while !started.load(Ordering::SeqCst) {
1101            thread::yield_now();
1102        }
1103        wait_until(Duration::from_secs(1), || {
1104            queue
1105                .shared
1106                .state
1107                .lock()
1108                .unwrap_or_else(|poison| poison.into_inner())
1109                .pending_count
1110                == 1
1111        });
1112
1113        // second concurrent offer should fail
1114        let result = queue.offer(3);
1115        assert!(result.is_err());
1116        assert!(format!("{result:?}").contains("Too many concurrent offers"));
1117
1118        // drain to unblock
1119        assert_eq!(stream.next(), Some(Ok(1)));
1120        blocker.join().unwrap();
1121    }
1122
1123    #[test]
1124    fn source_queue_watch_completion() {
1125        let (queue, mut stream) =
1126            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1127        queue.offer(1).unwrap();
1128        queue.complete();
1129        assert_eq!(stream.next(), Some(Ok(1)));
1130        assert_eq!(stream.next(), None);
1131        assert_eq!(queue.watch_completion().wait(), Ok(NotUsed));
1132    }
1133
1134    #[test]
1135    fn source_queue_watch_completion_on_failure() {
1136        let (queue, mut stream) =
1137            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1138        queue.fail(StreamError::Failed("boom".into()));
1139        assert_eq!(stream.next(), Some(Err(StreamError::Failed("boom".into()))));
1140        assert_eq!(
1141            queue.watch_completion().wait(),
1142            Err(StreamError::Failed("boom".into()))
1143        );
1144    }
1145
1146    #[test]
1147    fn source_queue_terminal_completion_is_sticky() {
1148        let (queue, mut stream) =
1149            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1150        queue.complete();
1151        assert_eq!(stream.next(), None);
1152        assert_eq!(stream.next(), None);
1153        assert_eq!(stream.next(), None);
1154    }
1155
1156    #[test]
1157    fn source_queue_offer_after_complete_returns_queue_closed() {
1158        let (queue, _stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1159        queue.complete();
1160        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::QueueClosed);
1161    }
1162
1163    #[test]
1164    fn source_queue_drop_stream_closes_queue() {
1165        let (queue, stream) = materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1166        queue.offer(1).unwrap();
1167        drop(stream);
1168        // handle still alive, stream dropped → terminal set
1169        assert_eq!(queue.offer(2).unwrap(), QueueOfferResult::QueueClosed);
1170    }
1171
1172    #[test]
1173    fn source_queue_drop_last_producer_completes_stream() {
1174        let (queue, mut stream) =
1175            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1176        let producer = queue.clone();
1177        let consumer = thread::spawn(move || {
1178            assert_eq!(stream.next(), None);
1179        });
1180        drop(producer);
1181        drop(queue);
1182        consumer.join().unwrap();
1183    }
1184
1185    #[test]
1186    fn source_queue_backpressure_unblocks_on_complete() {
1187        let (queue, mut stream) =
1188            materialize_source_queue::<i32>(1, OverflowStrategy::Backpressure);
1189        assert_eq!(queue.offer(1).unwrap(), QueueOfferResult::Enqueued);
1190
1191        let q = queue.clone();
1192        let blocker = thread::spawn(move || {
1193            // blocks waiting for space
1194            q.offer(2)
1195        });
1196
1197        wait_until(Duration::from_secs(1), || {
1198            queue
1199                .shared
1200                .state
1201                .lock()
1202                .unwrap_or_else(|poison| poison.into_inner())
1203                .pending_count
1204                == 1
1205        });
1206        // complete wakes the backpressured offerer
1207        queue.complete();
1208        let result = blocker.join().unwrap();
1209        assert_eq!(result.unwrap(), QueueOfferResult::QueueClosed);
1210
1211        // buffer still has item 1
1212        assert_eq!(stream.next(), Some(Ok(1)));
1213        assert_eq!(stream.next(), None);
1214    }
1215
1216    // ── SinkQueue tests ─────────────────────────────────────────────────
1217
1218    fn materialize_sink_queue<T: Send + 'static>(source: Source<T>) -> SinkQueue<T> {
1219        source.run_with(Sink::queue()).unwrap()
1220    }
1221
1222    #[test]
1223    fn sink_queue_pull_elements() {
1224        let queue = materialize_sink_queue(Source::from_iter([1, 2, 3]));
1225        assert_eq!(queue.pull().unwrap(), Some(1));
1226        assert_eq!(queue.pull().unwrap(), Some(2));
1227        assert_eq!(queue.pull().unwrap(), Some(3));
1228        assert_eq!(queue.pull().unwrap(), None);
1229    }
1230
1231    #[test]
1232    fn sink_queue_pull_none_after_upstream_completion() {
1233        let queue = materialize_sink_queue(Source::from_iter([42]));
1234        assert_eq!(queue.pull().unwrap(), Some(42));
1235        assert_eq!(queue.pull().unwrap(), None);
1236        assert_eq!(queue.pull().unwrap(), None);
1237    }
1238
1239    #[test]
1240    fn sink_queue_pull_error_from_upstream_failure() {
1241        let queue =
1242            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1243        assert_eq!(
1244            queue.pull().unwrap_err(),
1245            StreamError::Failed("boom".into())
1246        );
1247        assert_eq!(
1248            queue.pull().unwrap_err(),
1249            StreamError::Failed("boom".into())
1250        );
1251    }
1252
1253    #[test]
1254    fn sink_queue_terminal_stickiness() {
1255        let queue = materialize_sink_queue(Source::<i32>::empty());
1256        assert_eq!(queue.pull().unwrap(), None);
1257        assert_eq!(queue.pull().unwrap(), None);
1258        assert_eq!(queue.pull().unwrap(), None);
1259    }
1260
1261    #[test]
1262    fn sink_queue_terminal_failure_stickiness() {
1263        let queue =
1264            materialize_sink_queue(Source::<i32>::failed(StreamError::Failed("boom".into())));
1265        assert_eq!(
1266            queue.pull().unwrap_err(),
1267            StreamError::Failed("boom".into())
1268        );
1269        assert_eq!(
1270            queue.pull().unwrap_err(),
1271            StreamError::Failed("boom".into())
1272        );
1273    }
1274
1275    #[test]
1276    fn sink_queue_drop_cancels_upstream() {
1277        let queue = materialize_sink_queue(Source::repeat(42));
1278        // dropping queue cancels worker
1279        drop(queue);
1280        // if this doesn't deadlock, the worker was cancelled
1281    }
1282
1283    #[test]
1284    fn sink_queue_drain_multiple_items() {
1285        let queue = materialize_sink_queue(Source::from_iter(0..100));
1286        for i in 0..100 {
1287            assert_eq!(queue.pull().unwrap(), Some(i));
1288        }
1289        assert_eq!(queue.pull().unwrap(), None);
1290    }
1291
1292    // ── Helpers ─────────────────────────────────────────────────────────
1293
1294    fn materialize_bounded_queue<T: Send + 'static>(
1295        capacity: usize,
1296    ) -> (BoundedSourceQueue<T>, BoxStream<T>) {
1297        let materializer = Materializer::new();
1298        let (stream, queue) = Source::<T>::queue_bounded(capacity)
1299            .factory
1300            .create(&materializer)
1301            .unwrap();
1302        (queue, stream)
1303    }
1304
1305    fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
1306        let deadline = Instant::now() + timeout;
1307        while Instant::now() < deadline {
1308            if condition() {
1309                return;
1310            }
1311            thread::yield_now();
1312            thread::sleep(Duration::from_millis(1));
1313        }
1314        assert!(condition(), "condition was not met within {timeout:?}");
1315    }
1316}