Skip to main content

datum/concurrent/
channel.rs

1//! Closeable bounded MPSC channel.
2//!
3//! [`Channel`] mirrors FS2's bounded `Channel` shape for Datum: many producers
4//! feed exactly one active consumer stream. The element path is a
5//! `crossbeam_queue::ArrayQueue`; producers do not send actor messages per
6//! element and the successful `send` / `try_send` fast path takes no mutex.
7//! Waiting producers park through a waker protocol when the ring is full, and
8//! the synchronous consumer parks on the same fenced condvar pattern used by
9//! `BoundedSourceQueue`.
10//!
11//! This first version intentionally has a close-only terminal model. There is
12//! no typed `fail()` API: `close()` is graceful, buffered elements drain, and
13//! the source then completes.
14
15use std::{
16    collections::VecDeque,
17    fmt,
18    sync::{
19        Arc, Condvar, Mutex,
20        atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering, fence},
21    },
22    time::Duration,
23};
24
25use crossbeam_queue::ArrayQueue;
26use tokio::sync::Notify;
27
28use crate::stream::{BoxStream, NotUsed, Source};
29use crate::{StreamError, StreamResult};
30
31const CHANNEL_OPEN: u8 = 0;
32const CHANNEL_CLOSED: u8 = 1;
33
34/// Upper bound on a single consumer park. Normal wakeups are prompt; this is a
35/// defensive backstop for the same reason as `BoundedSourceQueue`'s park loop.
36const PARK_BACKSTOP: Duration = Duration::from_millis(10);
37
38/// Maximum number of elements the consumer pulls out of the lock-free ring on
39/// one stream poll before handing freed capacity back to parked producers.
40const CONSUMER_DRAIN_BATCH: usize = 256;
41
42/// Number of newly-freed ring slots one woken producer is expected to claim
43/// from its tight send loop before another parked producer is woken.
44const PRODUCER_WAKE_BATCH: usize = 256;
45
46/// A closeable bounded MPSC channel whose consumer side is a [`Source`].
47///
48/// Clone the handle for producers. At most one consumer stream may be active at
49/// a time; a second concurrent materialization of the same channel source
50/// returns a [`StreamError`]. Dropping the active consumer closes the channel so
51/// blocked producers wake and future sends fail with [`SendError::Closed`].
52///
53/// Capacity must be greater than zero. FS2's synchronous/rendezvous
54/// `bounded(0)` mode is deliberately unsupported in this v1 bounded channel.
55pub struct Channel<T> {
56    shared: Arc<ChannelShared<T>>,
57    local: Arc<ProducerLocal>,
58}
59
60/// Error returned by [`Channel::send`] when the channel closes before the value
61/// is accepted.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum SendError<T> {
64    /// The channel is closed; the unsent value is returned to the caller.
65    Closed(T),
66}
67
68/// Error returned by [`Channel::try_send`].
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum TrySendError<T> {
71    /// The channel buffer is currently full; the unsent value is returned.
72    Full(T),
73    /// The channel is closed; the unsent value is returned.
74    Closed(T),
75}
76
77struct ChannelShared<T> {
78    buffer: ArrayQueue<T>,
79    capacity: usize,
80    available_slots: AtomicUsize,
81    closed: AtomicU8,
82    in_flight_senders: AtomicUsize,
83    consumer_active: AtomicBool,
84    consumer_park: Mutex<()>,
85    consumer_available: Condvar,
86    consumer_parked: AtomicBool,
87    producer_waiters: Mutex<VecDeque<Arc<ProducerLocal>>>,
88    space_waiters: AtomicUsize,
89    closed_notified: Notify,
90}
91
92struct ProducerLocal {
93    reserved_slots: AtomicUsize,
94    queued: AtomicBool,
95    active: AtomicBool,
96    available: Notify,
97}
98
99struct ChannelStream<T> {
100    shared: Arc<ChannelShared<T>>,
101    pending: VecDeque<T>,
102    active: bool,
103}
104
105impl<T> Channel<T> {
106    /// Create a bounded channel. Panics if `capacity == 0`.
107    #[must_use]
108    pub fn bounded(capacity: usize) -> Self {
109        assert!(capacity > 0, "channel capacity must be greater than zero");
110        Self {
111            shared: ChannelShared::new(capacity),
112            local: ProducerLocal::new(),
113        }
114    }
115
116    /// Return a source blueprint attached to this channel.
117    ///
118    /// Materialization claims the one active consumer slot. A concurrent second
119    /// materialization fails with [`StreamError::Failed`].
120    #[must_use]
121    pub fn source(&self) -> Source<T>
122    where
123        T: Send + 'static,
124    {
125        let shared = Arc::clone(&self.shared);
126        Source::from_materialized_factory(move |_materializer| {
127            let stream = ChannelShared::new_stream(Arc::clone(&shared))?;
128            Ok((stream, NotUsed))
129        })
130    }
131
132    /// Asynchronously send one value, waiting without spinning while the buffer
133    /// is full. Returns [`SendError::Closed`] if the channel closes before the
134    /// value is accepted.
135    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
136        loop {
137            match self.try_send(value) {
138                Ok(()) => return Ok(()),
139                Err(TrySendError::Closed(value)) => return Err(SendError::Closed(value)),
140                Err(TrySendError::Full(returned)) => {
141                    value = returned;
142                }
143            }
144
145            self.shared.wait_for_space_or_close(&self.local).await;
146        }
147    }
148
149    /// Try to enqueue one value without waiting.
150    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
151        self.shared.try_send_value(&self.local, value)
152    }
153
154    /// Gracefully close the channel.
155    ///
156    /// Buffered elements remain visible to the consumer and drain before the
157    /// source completes. The operation is idempotent.
158    pub fn close(&self) {
159        self.shared.close();
160    }
161
162    /// Return true after the channel has been closed.
163    #[must_use]
164    pub fn is_closed(&self) -> bool {
165        self.shared.is_closed()
166    }
167
168    /// Wait until the channel is closed.
169    pub async fn closed(&self) {
170        loop {
171            if self.is_closed() {
172                return;
173            }
174
175            let notified = self.shared.closed_notified.notified();
176            let mut notified = std::pin::pin!(notified);
177            notified.as_mut().enable();
178            if self.is_closed() {
179                return;
180            }
181            notified.as_mut().await;
182        }
183    }
184}
185
186impl<T> Clone for Channel<T> {
187    fn clone(&self) -> Self {
188        Self {
189            shared: Arc::clone(&self.shared),
190            local: ProducerLocal::new(),
191        }
192    }
193}
194
195impl<T> Drop for Channel<T> {
196    fn drop(&mut self) {
197        self.local.active.store(false, Ordering::Release);
198        self.shared.cancel_waiter(&self.local);
199        self.local.available.notify_waiters();
200        let reserved = self.local.reserved_slots.swap(0, Ordering::AcqRel);
201        if reserved > 0 && !self.shared.is_closed() {
202            self.shared.release_slots(reserved);
203        }
204    }
205}
206
207impl ProducerLocal {
208    fn new() -> Arc<Self> {
209        Arc::new(Self {
210            reserved_slots: AtomicUsize::new(0),
211            queued: AtomicBool::new(false),
212            active: AtomicBool::new(true),
213            available: Notify::new(),
214        })
215    }
216}
217
218impl<T: Send + 'static> Source<T, NotUsed> {
219    /// A source fed by a materialized [`Channel`] handle.
220    ///
221    /// The source blueprint is inert until materialized. Each materialization
222    /// creates a fresh channel and claims its one consumer slot immediately.
223    /// Panics if `capacity == 0`.
224    #[must_use]
225    pub fn channel(capacity: usize) -> Source<T, Channel<T>> {
226        assert!(capacity > 0, "channel capacity must be greater than zero");
227        Source::from_materialized_factory(move |_materializer| {
228            let channel = Channel::bounded(capacity);
229            let stream = ChannelShared::new_stream(Arc::clone(&channel.shared))?;
230            Ok((stream, channel))
231        })
232    }
233}
234
235impl<T> ChannelShared<T> {
236    fn new(capacity: usize) -> Arc<Self> {
237        Arc::new(Self {
238            buffer: ArrayQueue::new(capacity),
239            capacity,
240            available_slots: AtomicUsize::new(capacity),
241            closed: AtomicU8::new(CHANNEL_OPEN),
242            in_flight_senders: AtomicUsize::new(0),
243            consumer_active: AtomicBool::new(false),
244            consumer_park: Mutex::new(()),
245            consumer_available: Condvar::new(),
246            consumer_parked: AtomicBool::new(false),
247            producer_waiters: Mutex::new(VecDeque::new()),
248            space_waiters: AtomicUsize::new(0),
249            closed_notified: Notify::new(),
250        })
251    }
252
253    fn new_stream(shared: Arc<Self>) -> StreamResult<BoxStream<T>>
254    where
255        T: Send + 'static,
256    {
257        shared.acquire_consumer()?;
258        Ok(Box::new(ChannelStream {
259            shared,
260            pending: VecDeque::new(),
261            active: true,
262        }))
263    }
264
265    fn acquire_consumer(&self) -> StreamResult<()> {
266        self.consumer_active
267            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
268            .map(|_| ())
269            .map_err(|_| {
270                StreamError::Failed("channel source already has an active consumer".into())
271            })
272    }
273
274    fn release_consumer(&self) {
275        self.consumer_active.store(false, Ordering::Release);
276    }
277
278    fn is_closed(&self) -> bool {
279        self.closed.load(Ordering::Acquire) == CHANNEL_CLOSED
280    }
281
282    fn close(&self) -> bool {
283        let closed = self
284            .closed
285            .compare_exchange(
286                CHANNEL_OPEN,
287                CHANNEL_CLOSED,
288                Ordering::AcqRel,
289                Ordering::Acquire,
290            )
291            .is_ok();
292        if closed {
293            self.wake_all_senders();
294            self.closed_notified.notify_waiters();
295            self.wake_consumer();
296        }
297        closed
298    }
299
300    fn wake_consumer(&self) {
301        fence(Ordering::SeqCst);
302        if self.consumer_parked.load(Ordering::Relaxed) {
303            let _guard = self
304                .consumer_park
305                .lock()
306                .unwrap_or_else(|poison| poison.into_inner());
307            self.consumer_available.notify_one();
308        }
309    }
310
311    fn wake_all_senders(&self) {
312        fence(Ordering::SeqCst);
313        let mut waiters = self
314            .producer_waiters
315            .lock()
316            .unwrap_or_else(|poison| poison.into_inner());
317        while let Some(local) = waiters.pop_front() {
318            if local.queued.swap(false, Ordering::AcqRel) {
319                self.space_waiters.fetch_sub(1, Ordering::AcqRel);
320                local.available.notify_waiters();
321            }
322        }
323    }
324
325    async fn wait_for_space_or_close(&self, local: &Arc<ProducerLocal>) {
326        let notified = local.available.notified();
327        let mut notified = std::pin::pin!(notified);
328        notified.as_mut().enable();
329
330        let guard = self
331            .producer_waiters
332            .lock()
333            .unwrap_or_else(|poison| poison.into_inner());
334        if local.reserved_slots.load(Ordering::Acquire) > 0
335            || self.available_slots.load(Ordering::Acquire) > 0
336            || self.is_closed()
337        {
338            drop(guard);
339            return;
340        }
341        if !local.queued.swap(true, Ordering::AcqRel) {
342            self.space_waiters.fetch_add(1, Ordering::AcqRel);
343            // The queue holds one Arc per parked producer handle; stale entries
344            // are discarded on grant/cancel if a send future gets unstuck first.
345            let mut waiters = guard;
346            waiters.push_back(Arc::clone(local));
347            drop(waiters);
348        } else {
349            drop(guard);
350        }
351
352        fence(Ordering::SeqCst);
353        if local.reserved_slots.load(Ordering::Acquire) > 0
354            || self.available_slots.load(Ordering::Acquire) > 0
355            || self.is_closed()
356        {
357            self.cancel_waiter(local);
358            return;
359        }
360
361        notified.as_mut().await;
362        self.cancel_waiter(local);
363    }
364
365    fn cancel_waiter(&self, local: &ProducerLocal) {
366        if local.queued.swap(false, Ordering::AcqRel) {
367            self.space_waiters.fetch_sub(1, Ordering::AcqRel);
368        }
369    }
370
371    fn try_acquire_global_slot(&self) -> bool {
372        let mut slots = self.available_slots.load(Ordering::Acquire);
373        loop {
374            if slots == 0 {
375                return false;
376            }
377            match self.available_slots.compare_exchange_weak(
378                slots,
379                slots - 1,
380                Ordering::AcqRel,
381                Ordering::Acquire,
382            ) {
383                Ok(_) => return true,
384                Err(actual) => slots = actual,
385            }
386        }
387    }
388
389    fn try_acquire_local_slot(&self, local: &ProducerLocal) -> bool {
390        let mut slots = local.reserved_slots.load(Ordering::Acquire);
391        loop {
392            if slots == 0 {
393                return false;
394            }
395            match local.reserved_slots.compare_exchange_weak(
396                slots,
397                slots - 1,
398                Ordering::AcqRel,
399                Ordering::Acquire,
400            ) {
401                Ok(_) => return true,
402                Err(actual) => slots = actual,
403            }
404        }
405    }
406
407    fn release_slots(&self, count: usize) {
408        if count == 0 {
409            return;
410        }
411        let remaining = self.grant_slots_to_waiters(count);
412        if remaining > 0 {
413            let previous = self.available_slots.fetch_add(remaining, Ordering::AcqRel);
414            debug_assert!(
415                previous + remaining <= self.capacity,
416                "channel available slot count exceeded capacity"
417            );
418        }
419    }
420
421    fn handoff_available_slots(&self) {
422        if self.space_waiters.load(Ordering::Acquire) == 0 {
423            return;
424        }
425        let mut slots = self.available_slots.load(Ordering::Acquire);
426        loop {
427            if slots == 0 {
428                return;
429            }
430            let claimed = slots.min(PRODUCER_WAKE_BATCH);
431            match self.available_slots.compare_exchange_weak(
432                slots,
433                slots - claimed,
434                Ordering::AcqRel,
435                Ordering::Acquire,
436            ) {
437                Ok(_) => {
438                    let remaining = self.grant_slots_to_waiters(claimed);
439                    if remaining > 0 {
440                        self.available_slots.fetch_add(remaining, Ordering::AcqRel);
441                    }
442                    return;
443                }
444                Err(actual) => slots = actual,
445            }
446        }
447    }
448
449    fn grant_slots_to_waiters(&self, mut slots: usize) -> usize {
450        if slots == 0 || self.space_waiters.load(Ordering::Acquire) == 0 {
451            return slots;
452        }
453        fence(Ordering::SeqCst);
454        let mut waiters = self
455            .producer_waiters
456            .lock()
457            .unwrap_or_else(|poison| poison.into_inner());
458        while slots > 0 {
459            let Some(local) = waiters.pop_front() else {
460                break;
461            };
462            if !local.queued.swap(false, Ordering::AcqRel) {
463                continue;
464            }
465            self.space_waiters.fetch_sub(1, Ordering::AcqRel);
466            if !local.active.load(Ordering::Acquire) || self.is_closed() {
467                local.available.notify_waiters();
468                continue;
469            }
470            let grant = slots.min(PRODUCER_WAKE_BATCH);
471            local.reserved_slots.fetch_add(grant, Ordering::AcqRel);
472            slots -= grant;
473            local.available.notify_waiters();
474        }
475        slots
476    }
477
478    fn finish_send(&self) {
479        let previous = self.in_flight_senders.fetch_sub(1, Ordering::AcqRel);
480        debug_assert!(previous > 0, "channel in-flight sender underflow");
481        if previous == 1 && self.is_closed() {
482            self.wake_consumer();
483        }
484    }
485
486    fn try_send_value(&self, local: &ProducerLocal, value: T) -> Result<(), TrySendError<T>> {
487        if self.is_closed() {
488            return Err(TrySendError::Closed(value));
489        }
490        self.in_flight_senders.fetch_add(1, Ordering::AcqRel);
491        if self.is_closed() {
492            self.finish_send();
493            return Err(TrySendError::Closed(value));
494        }
495        let used_local_slot = self.try_acquire_local_slot(local);
496        if !used_local_slot && !self.try_acquire_global_slot() {
497            self.finish_send();
498            return if self.is_closed() {
499                Err(TrySendError::Closed(value))
500            } else {
501                Err(TrySendError::Full(value))
502            };
503        }
504        match self.buffer.push(value) {
505            Ok(()) => {
506                self.finish_send();
507                self.wake_consumer();
508                Ok(())
509            }
510            Err(value) => {
511                if used_local_slot {
512                    local.reserved_slots.fetch_add(1, Ordering::AcqRel);
513                } else {
514                    self.release_slots(1);
515                }
516                self.finish_send();
517                if self.is_closed() {
518                    Err(TrySendError::Closed(value))
519                } else {
520                    Err(TrySendError::Full(value))
521                }
522            }
523        }
524    }
525}
526
527impl<T> Iterator for ChannelStream<T> {
528    type Item = StreamResult<T>;
529
530    fn next(&mut self) -> Option<Self::Item> {
531        loop {
532            if let Some(item) = self.pending.pop_front() {
533                return Some(Ok(item));
534            }
535
536            if let Some(item) = self.drain_batch() {
537                return Some(Ok(item));
538            }
539
540            if self.shared.is_closed() {
541                if let Some(item) = self.drain_batch() {
542                    return Some(Ok(item));
543                }
544                if self.shared.in_flight_senders.load(Ordering::Acquire) == 0 {
545                    self.finish();
546                    return None;
547                }
548            }
549
550            let shared = &*self.shared;
551            let guard = shared
552                .consumer_park
553                .lock()
554                .unwrap_or_else(|poison| poison.into_inner());
555            shared.consumer_parked.store(true, Ordering::Relaxed);
556            fence(Ordering::SeqCst);
557            if !shared.buffer.is_empty()
558                || (shared.is_closed() && shared.in_flight_senders.load(Ordering::Acquire) == 0)
559            {
560                shared.consumer_parked.store(false, Ordering::Relaxed);
561                drop(guard);
562                continue;
563            }
564            if !shared.is_closed() && shared.space_waiters.load(Ordering::Acquire) > 0 {
565                // The ring is empty but producers can still be parked from a
566                // previous full observation. Hand out one bounded wake batch
567                // before sleeping so a partially-used producer grant cannot
568                // strand the stream.
569                shared.handoff_available_slots();
570            }
571            let (guard, _timeout) = shared
572                .consumer_available
573                .wait_timeout(guard, PARK_BACKSTOP)
574                .unwrap_or_else(|poison| poison.into_inner());
575            shared.consumer_parked.store(false, Ordering::Relaxed);
576            drop(guard);
577        }
578    }
579}
580
581impl<T> ChannelStream<T> {
582    fn drain_batch(&mut self) -> Option<T> {
583        let first = self.shared.buffer.pop()?;
584        let mut drained = 1;
585        while drained < CONSUMER_DRAIN_BATCH {
586            let Some(item) = self.shared.buffer.pop() else {
587                break;
588            };
589            self.pending.push_back(item);
590            drained += 1;
591        }
592        self.shared.release_slots(drained);
593        Some(first)
594    }
595
596    fn finish(&mut self) {
597        if self.active {
598            self.active = false;
599            self.shared.release_consumer();
600        }
601    }
602}
603
604impl<T> Drop for ChannelStream<T> {
605    fn drop(&mut self) {
606        if self.active {
607            self.shared.close();
608            self.shared.release_consumer();
609            self.active = false;
610        }
611    }
612}
613
614impl<T> fmt::Debug for Channel<T> {
615    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616        f.debug_struct("Channel")
617            .field("closed", &self.is_closed())
618            .finish_non_exhaustive()
619    }
620}
621
622impl<T> fmt::Display for SendError<T> {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        match self {
625            SendError::Closed(_) => f.write_str("channel is closed"),
626        }
627    }
628}
629
630impl<T> fmt::Display for TrySendError<T> {
631    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
632        match self {
633            TrySendError::Full(_) => f.write_str("channel is full"),
634            TrySendError::Closed(_) => f.write_str("channel is closed"),
635        }
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642    use crate::stream::Materializer;
643    use futures::executor::block_on;
644    use std::{
645        collections::HashMap,
646        sync::{
647            Arc,
648            atomic::{AtomicBool, AtomicUsize, Ordering},
649        },
650        thread,
651        time::{Duration, Instant},
652    };
653
654    #[test]
655    fn try_send_reports_full_and_closed() {
656        let channel = Channel::bounded(1);
657        let mut stream = materialize_channel(&channel);
658
659        assert_eq!(channel.try_send(1), Ok(()));
660        assert_eq!(channel.try_send(2), Err(TrySendError::Full(2)));
661        assert_eq!(stream.next(), Some(Ok(1)));
662        assert_eq!(channel.try_send(3), Ok(()));
663        channel.close();
664        assert_eq!(channel.try_send(4), Err(TrySendError::Closed(4)));
665        assert_eq!(stream.next(), Some(Ok(3)));
666        assert_eq!(stream.next(), None);
667    }
668
669    #[test]
670    fn send_many_producers_preserves_per_producer_order() {
671        const PRODUCERS: usize = 8;
672        const PER_PRODUCER: usize = 256;
673
674        let channel = Channel::bounded(32);
675        let stream = materialize_channel(&channel);
676        let consumer = thread::spawn(move || {
677            let mut collected = Vec::new();
678            for item in stream {
679                collected.push(item.expect("channel has no failure terminal"));
680            }
681            collected
682        });
683
684        let mut handles = Vec::new();
685        for producer in 0..PRODUCERS {
686            let channel = channel.clone();
687            handles.push(thread::spawn(move || {
688                for seq in 0..PER_PRODUCER {
689                    block_on(channel.send((producer, seq))).unwrap();
690                }
691            }));
692        }
693        for handle in handles {
694            handle.join().unwrap();
695        }
696        channel.close();
697
698        let collected = consumer.join().unwrap();
699        assert_eq!(collected.len(), PRODUCERS * PER_PRODUCER);
700
701        let mut by_producer: HashMap<usize, Vec<usize>> = HashMap::new();
702        for (producer, seq) in collected {
703            by_producer.entry(producer).or_default().push(seq);
704        }
705        for producer in 0..PRODUCERS {
706            assert_eq!(
707                by_producer.remove(&producer).unwrap(),
708                (0..PER_PRODUCER).collect::<Vec<_>>()
709            );
710        }
711    }
712
713    #[test]
714    fn try_send_under_contention_counts_all_accepted_elements() {
715        const PRODUCERS: usize = 8;
716        const PER_PRODUCER: usize = 128;
717        let total = PRODUCERS * PER_PRODUCER;
718
719        let channel = Channel::bounded(total);
720        let mut handles = Vec::new();
721        for producer in 0..PRODUCERS {
722            let channel = channel.clone();
723            handles.push(thread::spawn(move || {
724                for seq in 0..PER_PRODUCER {
725                    channel.try_send((producer, seq)).unwrap();
726                }
727            }));
728        }
729        for handle in handles {
730            handle.join().unwrap();
731        }
732        channel.close();
733
734        let stream = materialize_channel(&channel);
735        let mut count = 0;
736        for item in stream {
737            item.unwrap();
738            count += 1;
739        }
740        assert_eq!(count, total);
741    }
742
743    #[test]
744    fn send_backpressure_parks_and_resumes_on_consume() {
745        let channel = Channel::bounded(1);
746        let mut stream = materialize_channel(&channel);
747        block_on(channel.send(1)).unwrap();
748
749        let completed = Arc::new(AtomicBool::new(false));
750        let send_completed = Arc::clone(&completed);
751        let sender = {
752            let channel = channel.clone();
753            thread::spawn(move || {
754                let result = block_on(channel.send(2));
755                send_completed.store(true, Ordering::SeqCst);
756                result
757            })
758        };
759
760        wait_until(Duration::from_secs(1), || {
761            channel.shared.space_waiters.load(Ordering::SeqCst) >= 1
762        });
763        assert!(!completed.load(Ordering::SeqCst));
764
765        assert_eq!(stream.next(), Some(Ok(1)));
766        assert_eq!(sender.join().unwrap(), Ok(()));
767        channel.close();
768        assert_eq!(stream.next(), Some(Ok(2)));
769        assert_eq!(stream.next(), None);
770    }
771
772    #[test]
773    fn close_drains_buffer_before_completion() {
774        let channel = Channel::bounded(3);
775        let mut stream = materialize_channel(&channel);
776        assert_eq!(channel.try_send(1), Ok(()));
777        assert_eq!(channel.try_send(2), Ok(()));
778        assert_eq!(channel.try_send(3), Ok(()));
779        channel.close();
780
781        assert_eq!(stream.next(), Some(Ok(1)));
782        assert_eq!(stream.next(), Some(Ok(2)));
783        assert_eq!(stream.next(), Some(Ok(3)));
784        assert_eq!(stream.next(), None);
785        assert_eq!(stream.next(), None);
786    }
787
788    #[test]
789    fn concurrent_close_vs_send_never_loses_accepted_elements() {
790        const ROUNDS: usize = 20;
791        const PRODUCERS: usize = 8;
792        const PER_PRODUCER: usize = 200;
793
794        for _ in 0..ROUNDS {
795            let channel = Channel::bounded(4);
796            let stream = materialize_channel(&channel);
797            let consumer = thread::spawn(move || {
798                let mut count = 0_usize;
799                for item in stream {
800                    item.unwrap();
801                    count += 1;
802                }
803                count
804            });
805
806            let mut handles = Vec::new();
807            let started = Arc::new(AtomicUsize::new(0));
808            for producer in 0..PRODUCERS {
809                let channel = channel.clone();
810                let started = Arc::clone(&started);
811                handles.push(thread::spawn(move || {
812                    let mut accepted = 0_usize;
813                    started.fetch_add(1, Ordering::SeqCst);
814                    for seq in 0..PER_PRODUCER {
815                        if block_on(channel.send((producer, seq))).is_ok() {
816                            accepted += 1;
817                        } else {
818                            break;
819                        }
820                    }
821                    accepted
822                }));
823            }
824
825            wait_until(Duration::from_secs(1), || {
826                started.load(Ordering::SeqCst) == PRODUCERS
827            });
828            channel.close();
829
830            let accepted: usize = handles
831                .into_iter()
832                .map(|handle| handle.join().unwrap())
833                .sum();
834            let delivered = consumer.join().unwrap();
835            assert_eq!(delivered, accepted);
836            assert_eq!(
837                channel.try_send((usize::MAX, usize::MAX)),
838                Err(TrySendError::Closed((usize::MAX, usize::MAX)))
839            );
840        }
841    }
842
843    #[test]
844    fn closed_future_wakes_on_close() {
845        let channel = Channel::<u64>::bounded(1);
846        let waiting = Arc::new(AtomicBool::new(false));
847        let waiter_started = Arc::clone(&waiting);
848        let waiter = {
849            let channel = channel.clone();
850            thread::spawn(move || {
851                waiter_started.store(true, Ordering::SeqCst);
852                block_on(channel.closed());
853            })
854        };
855
856        wait_until(Duration::from_secs(1), || waiting.load(Ordering::SeqCst));
857        channel.close();
858        waiter.join().unwrap();
859    }
860
861    #[test]
862    fn consumer_drop_closes_channel_and_wakes_blocked_producers() {
863        let channel = Channel::bounded(1);
864        let stream = materialize_channel(&channel);
865        block_on(channel.send(1)).unwrap();
866
867        let sender = {
868            let channel = channel.clone();
869            thread::spawn(move || block_on(channel.send(2)))
870        };
871
872        wait_until(Duration::from_secs(1), || {
873            channel.shared.space_waiters.load(Ordering::SeqCst) >= 1
874        });
875        drop(stream);
876
877        assert_eq!(sender.join().unwrap(), Err(SendError::Closed(2)));
878        assert!(channel.is_closed());
879        assert_eq!(channel.try_send(3), Err(TrySendError::Closed(3)));
880
881        let replacement = Channel::bounded(1);
882        let mut replacement_stream = materialize_channel(&replacement);
883        block_on(replacement.send(10)).unwrap();
884        replacement.close();
885        assert_eq!(replacement_stream.next(), Some(Ok(10)));
886        assert_eq!(replacement_stream.next(), None);
887    }
888
889    #[test]
890    fn single_active_consumer_is_enforced() {
891        let materializer = Materializer::new();
892        let channel = Channel::<i32>::bounded(1);
893        let source = channel.source();
894        let (_first, _) = Arc::clone(&source.factory).create(&materializer).unwrap();
895
896        let second = Arc::clone(&source.factory).create(&materializer);
897        assert!(
898            matches!(second, Err(StreamError::Failed(message)) if message.contains("active consumer"))
899        );
900    }
901
902    #[test]
903    fn capacity_one_ping_pong_preserves_order() {
904        const ITEMS: usize = 500;
905        let channel = Channel::bounded(1);
906        let stream = materialize_channel(&channel);
907        let consumer = thread::spawn(move || {
908            let mut got = Vec::new();
909            for item in stream {
910                got.push(item.unwrap());
911            }
912            got
913        });
914
915        for item in 0..ITEMS {
916            block_on(channel.send(item)).unwrap();
917        }
918        channel.close();
919
920        assert_eq!(consumer.join().unwrap(), (0..ITEMS).collect::<Vec<_>>());
921    }
922
923    #[test]
924    #[should_panic(expected = "channel capacity must be greater than zero")]
925    fn zero_capacity_is_unsupported() {
926        let _ = Channel::<i32>::bounded(0);
927    }
928
929    fn materialize_channel<T: Send + 'static>(channel: &Channel<T>) -> BoxStream<T> {
930        let materializer = Materializer::new();
931        let (stream, _) = channel.source().factory.create(&materializer).unwrap();
932        stream
933    }
934
935    fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
936        let deadline = Instant::now() + timeout;
937        while Instant::now() < deadline {
938            if condition() {
939                return;
940            }
941            thread::yield_now();
942            thread::sleep(Duration::from_millis(1));
943        }
944        assert!(condition(), "condition was not met within {timeout:?}");
945    }
946}