1use std::{
16 collections::VecDeque,
17 fmt,
18 sync::{
19 Arc, Condvar, Mutex,
20 atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering, fence},
21 },
22 time::Duration,
23};
24
25use crossbeam_queue::ArrayQueue;
26use tokio::sync::Notify;
27
28use crate::stream::{BoxStream, NotUsed, Source};
29use crate::{StreamError, StreamResult};
30
31const CHANNEL_OPEN: u8 = 0;
32const CHANNEL_CLOSED: u8 = 1;
33
34const PARK_BACKSTOP: Duration = Duration::from_millis(10);
37
38const CONSUMER_DRAIN_BATCH: usize = 256;
41
42const PRODUCER_WAKE_BATCH: usize = 256;
45
46pub struct Channel<T> {
56 shared: Arc<ChannelShared<T>>,
57 local: Arc<ProducerLocal>,
58}
59
60#[derive(Debug, Clone, PartialEq, Eq)]
63pub enum SendError<T> {
64 Closed(T),
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum TrySendError<T> {
71 Full(T),
73 Closed(T),
75}
76
77struct ChannelShared<T> {
78 buffer: ArrayQueue<T>,
79 capacity: usize,
80 available_slots: AtomicUsize,
81 closed: AtomicU8,
82 in_flight_senders: AtomicUsize,
83 consumer_active: AtomicBool,
84 consumer_park: Mutex<()>,
85 consumer_available: Condvar,
86 consumer_parked: AtomicBool,
87 producer_waiters: Mutex<VecDeque<Arc<ProducerLocal>>>,
88 space_waiters: AtomicUsize,
89 closed_notified: Notify,
90}
91
92struct ProducerLocal {
93 reserved_slots: AtomicUsize,
94 queued: AtomicBool,
95 active: AtomicBool,
96 available: Notify,
97}
98
99struct ChannelStream<T> {
100 shared: Arc<ChannelShared<T>>,
101 pending: VecDeque<T>,
102 active: bool,
103}
104
105impl<T> Channel<T> {
106 #[must_use]
108 pub fn bounded(capacity: usize) -> Self {
109 assert!(capacity > 0, "channel capacity must be greater than zero");
110 Self {
111 shared: ChannelShared::new(capacity),
112 local: ProducerLocal::new(),
113 }
114 }
115
116 #[must_use]
121 pub fn source(&self) -> Source<T>
122 where
123 T: Send + 'static,
124 {
125 let shared = Arc::clone(&self.shared);
126 Source::from_materialized_factory(move |_materializer| {
127 let stream = ChannelShared::new_stream(Arc::clone(&shared))?;
128 Ok((stream, NotUsed))
129 })
130 }
131
132 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
136 loop {
137 match self.try_send(value) {
138 Ok(()) => return Ok(()),
139 Err(TrySendError::Closed(value)) => return Err(SendError::Closed(value)),
140 Err(TrySendError::Full(returned)) => {
141 value = returned;
142 }
143 }
144
145 self.shared.wait_for_space_or_close(&self.local).await;
146 }
147 }
148
149 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
151 self.shared.try_send_value(&self.local, value)
152 }
153
154 pub fn close(&self) {
159 self.shared.close();
160 }
161
162 #[must_use]
164 pub fn is_closed(&self) -> bool {
165 self.shared.is_closed()
166 }
167
168 pub async fn closed(&self) {
170 loop {
171 if self.is_closed() {
172 return;
173 }
174
175 let notified = self.shared.closed_notified.notified();
176 let mut notified = std::pin::pin!(notified);
177 notified.as_mut().enable();
178 if self.is_closed() {
179 return;
180 }
181 notified.as_mut().await;
182 }
183 }
184}
185
186impl<T> Clone for Channel<T> {
187 fn clone(&self) -> Self {
188 Self {
189 shared: Arc::clone(&self.shared),
190 local: ProducerLocal::new(),
191 }
192 }
193}
194
195impl<T> Drop for Channel<T> {
196 fn drop(&mut self) {
197 self.local.active.store(false, Ordering::Release);
198 self.shared.cancel_waiter(&self.local);
199 self.local.available.notify_waiters();
200 let reserved = self.local.reserved_slots.swap(0, Ordering::AcqRel);
201 if reserved > 0 && !self.shared.is_closed() {
202 self.shared.release_slots(reserved);
203 }
204 }
205}
206
207impl ProducerLocal {
208 fn new() -> Arc<Self> {
209 Arc::new(Self {
210 reserved_slots: AtomicUsize::new(0),
211 queued: AtomicBool::new(false),
212 active: AtomicBool::new(true),
213 available: Notify::new(),
214 })
215 }
216}
217
218impl<T: Send + 'static> Source<T, NotUsed> {
219 #[must_use]
225 pub fn channel(capacity: usize) -> Source<T, Channel<T>> {
226 assert!(capacity > 0, "channel capacity must be greater than zero");
227 Source::from_materialized_factory(move |_materializer| {
228 let channel = Channel::bounded(capacity);
229 let stream = ChannelShared::new_stream(Arc::clone(&channel.shared))?;
230 Ok((stream, channel))
231 })
232 }
233}
234
235impl<T> ChannelShared<T> {
236 fn new(capacity: usize) -> Arc<Self> {
237 Arc::new(Self {
238 buffer: ArrayQueue::new(capacity),
239 capacity,
240 available_slots: AtomicUsize::new(capacity),
241 closed: AtomicU8::new(CHANNEL_OPEN),
242 in_flight_senders: AtomicUsize::new(0),
243 consumer_active: AtomicBool::new(false),
244 consumer_park: Mutex::new(()),
245 consumer_available: Condvar::new(),
246 consumer_parked: AtomicBool::new(false),
247 producer_waiters: Mutex::new(VecDeque::new()),
248 space_waiters: AtomicUsize::new(0),
249 closed_notified: Notify::new(),
250 })
251 }
252
253 fn new_stream(shared: Arc<Self>) -> StreamResult<BoxStream<T>>
254 where
255 T: Send + 'static,
256 {
257 shared.acquire_consumer()?;
258 Ok(Box::new(ChannelStream {
259 shared,
260 pending: VecDeque::new(),
261 active: true,
262 }))
263 }
264
265 fn acquire_consumer(&self) -> StreamResult<()> {
266 self.consumer_active
267 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
268 .map(|_| ())
269 .map_err(|_| {
270 StreamError::Failed("channel source already has an active consumer".into())
271 })
272 }
273
274 fn release_consumer(&self) {
275 self.consumer_active.store(false, Ordering::Release);
276 }
277
278 fn is_closed(&self) -> bool {
279 self.closed.load(Ordering::Acquire) == CHANNEL_CLOSED
280 }
281
282 fn close(&self) -> bool {
283 let closed = self
284 .closed
285 .compare_exchange(
286 CHANNEL_OPEN,
287 CHANNEL_CLOSED,
288 Ordering::AcqRel,
289 Ordering::Acquire,
290 )
291 .is_ok();
292 if closed {
293 self.wake_all_senders();
294 self.closed_notified.notify_waiters();
295 self.wake_consumer();
296 }
297 closed
298 }
299
300 fn wake_consumer(&self) {
301 fence(Ordering::SeqCst);
302 if self.consumer_parked.load(Ordering::Relaxed) {
303 let _guard = self
304 .consumer_park
305 .lock()
306 .unwrap_or_else(|poison| poison.into_inner());
307 self.consumer_available.notify_one();
308 }
309 }
310
311 fn wake_all_senders(&self) {
312 fence(Ordering::SeqCst);
313 let mut waiters = self
314 .producer_waiters
315 .lock()
316 .unwrap_or_else(|poison| poison.into_inner());
317 while let Some(local) = waiters.pop_front() {
318 if local.queued.swap(false, Ordering::AcqRel) {
319 self.space_waiters.fetch_sub(1, Ordering::AcqRel);
320 local.available.notify_waiters();
321 }
322 }
323 }
324
325 async fn wait_for_space_or_close(&self, local: &Arc<ProducerLocal>) {
326 let notified = local.available.notified();
327 let mut notified = std::pin::pin!(notified);
328 notified.as_mut().enable();
329
330 let guard = self
331 .producer_waiters
332 .lock()
333 .unwrap_or_else(|poison| poison.into_inner());
334 if local.reserved_slots.load(Ordering::Acquire) > 0
335 || self.available_slots.load(Ordering::Acquire) > 0
336 || self.is_closed()
337 {
338 drop(guard);
339 return;
340 }
341 if !local.queued.swap(true, Ordering::AcqRel) {
342 self.space_waiters.fetch_add(1, Ordering::AcqRel);
343 let mut waiters = guard;
346 waiters.push_back(Arc::clone(local));
347 drop(waiters);
348 } else {
349 drop(guard);
350 }
351
352 fence(Ordering::SeqCst);
353 if local.reserved_slots.load(Ordering::Acquire) > 0
354 || self.available_slots.load(Ordering::Acquire) > 0
355 || self.is_closed()
356 {
357 self.cancel_waiter(local);
358 return;
359 }
360
361 notified.as_mut().await;
362 self.cancel_waiter(local);
363 }
364
365 fn cancel_waiter(&self, local: &ProducerLocal) {
366 if local.queued.swap(false, Ordering::AcqRel) {
367 self.space_waiters.fetch_sub(1, Ordering::AcqRel);
368 }
369 }
370
371 fn try_acquire_global_slot(&self) -> bool {
372 let mut slots = self.available_slots.load(Ordering::Acquire);
373 loop {
374 if slots == 0 {
375 return false;
376 }
377 match self.available_slots.compare_exchange_weak(
378 slots,
379 slots - 1,
380 Ordering::AcqRel,
381 Ordering::Acquire,
382 ) {
383 Ok(_) => return true,
384 Err(actual) => slots = actual,
385 }
386 }
387 }
388
389 fn try_acquire_local_slot(&self, local: &ProducerLocal) -> bool {
390 let mut slots = local.reserved_slots.load(Ordering::Acquire);
391 loop {
392 if slots == 0 {
393 return false;
394 }
395 match local.reserved_slots.compare_exchange_weak(
396 slots,
397 slots - 1,
398 Ordering::AcqRel,
399 Ordering::Acquire,
400 ) {
401 Ok(_) => return true,
402 Err(actual) => slots = actual,
403 }
404 }
405 }
406
407 fn release_slots(&self, count: usize) {
408 if count == 0 {
409 return;
410 }
411 let remaining = self.grant_slots_to_waiters(count);
412 if remaining > 0 {
413 let previous = self.available_slots.fetch_add(remaining, Ordering::AcqRel);
414 debug_assert!(
415 previous + remaining <= self.capacity,
416 "channel available slot count exceeded capacity"
417 );
418 }
419 }
420
421 fn handoff_available_slots(&self) {
422 if self.space_waiters.load(Ordering::Acquire) == 0 {
423 return;
424 }
425 let mut slots = self.available_slots.load(Ordering::Acquire);
426 loop {
427 if slots == 0 {
428 return;
429 }
430 let claimed = slots.min(PRODUCER_WAKE_BATCH);
431 match self.available_slots.compare_exchange_weak(
432 slots,
433 slots - claimed,
434 Ordering::AcqRel,
435 Ordering::Acquire,
436 ) {
437 Ok(_) => {
438 let remaining = self.grant_slots_to_waiters(claimed);
439 if remaining > 0 {
440 self.available_slots.fetch_add(remaining, Ordering::AcqRel);
441 }
442 return;
443 }
444 Err(actual) => slots = actual,
445 }
446 }
447 }
448
449 fn grant_slots_to_waiters(&self, mut slots: usize) -> usize {
450 if slots == 0 || self.space_waiters.load(Ordering::Acquire) == 0 {
451 return slots;
452 }
453 fence(Ordering::SeqCst);
454 let mut waiters = self
455 .producer_waiters
456 .lock()
457 .unwrap_or_else(|poison| poison.into_inner());
458 while slots > 0 {
459 let Some(local) = waiters.pop_front() else {
460 break;
461 };
462 if !local.queued.swap(false, Ordering::AcqRel) {
463 continue;
464 }
465 self.space_waiters.fetch_sub(1, Ordering::AcqRel);
466 if !local.active.load(Ordering::Acquire) || self.is_closed() {
467 local.available.notify_waiters();
468 continue;
469 }
470 let grant = slots.min(PRODUCER_WAKE_BATCH);
471 local.reserved_slots.fetch_add(grant, Ordering::AcqRel);
472 slots -= grant;
473 local.available.notify_waiters();
474 }
475 slots
476 }
477
478 fn finish_send(&self) {
479 let previous = self.in_flight_senders.fetch_sub(1, Ordering::AcqRel);
480 debug_assert!(previous > 0, "channel in-flight sender underflow");
481 if previous == 1 && self.is_closed() {
482 self.wake_consumer();
483 }
484 }
485
486 fn try_send_value(&self, local: &ProducerLocal, value: T) -> Result<(), TrySendError<T>> {
487 if self.is_closed() {
488 return Err(TrySendError::Closed(value));
489 }
490 self.in_flight_senders.fetch_add(1, Ordering::AcqRel);
491 if self.is_closed() {
492 self.finish_send();
493 return Err(TrySendError::Closed(value));
494 }
495 let used_local_slot = self.try_acquire_local_slot(local);
496 if !used_local_slot && !self.try_acquire_global_slot() {
497 self.finish_send();
498 return if self.is_closed() {
499 Err(TrySendError::Closed(value))
500 } else {
501 Err(TrySendError::Full(value))
502 };
503 }
504 match self.buffer.push(value) {
505 Ok(()) => {
506 self.finish_send();
507 self.wake_consumer();
508 Ok(())
509 }
510 Err(value) => {
511 if used_local_slot {
512 local.reserved_slots.fetch_add(1, Ordering::AcqRel);
513 } else {
514 self.release_slots(1);
515 }
516 self.finish_send();
517 if self.is_closed() {
518 Err(TrySendError::Closed(value))
519 } else {
520 Err(TrySendError::Full(value))
521 }
522 }
523 }
524 }
525}
526
527impl<T> Iterator for ChannelStream<T> {
528 type Item = StreamResult<T>;
529
530 fn next(&mut self) -> Option<Self::Item> {
531 loop {
532 if let Some(item) = self.pending.pop_front() {
533 return Some(Ok(item));
534 }
535
536 if let Some(item) = self.drain_batch() {
537 return Some(Ok(item));
538 }
539
540 if self.shared.is_closed() {
541 if let Some(item) = self.drain_batch() {
542 return Some(Ok(item));
543 }
544 if self.shared.in_flight_senders.load(Ordering::Acquire) == 0 {
545 self.finish();
546 return None;
547 }
548 }
549
550 let shared = &*self.shared;
551 let guard = shared
552 .consumer_park
553 .lock()
554 .unwrap_or_else(|poison| poison.into_inner());
555 shared.consumer_parked.store(true, Ordering::Relaxed);
556 fence(Ordering::SeqCst);
557 if !shared.buffer.is_empty()
558 || (shared.is_closed() && shared.in_flight_senders.load(Ordering::Acquire) == 0)
559 {
560 shared.consumer_parked.store(false, Ordering::Relaxed);
561 drop(guard);
562 continue;
563 }
564 if !shared.is_closed() && shared.space_waiters.load(Ordering::Acquire) > 0 {
565 shared.handoff_available_slots();
570 }
571 let (guard, _timeout) = shared
572 .consumer_available
573 .wait_timeout(guard, PARK_BACKSTOP)
574 .unwrap_or_else(|poison| poison.into_inner());
575 shared.consumer_parked.store(false, Ordering::Relaxed);
576 drop(guard);
577 }
578 }
579}
580
581impl<T> ChannelStream<T> {
582 fn drain_batch(&mut self) -> Option<T> {
583 let first = self.shared.buffer.pop()?;
584 let mut drained = 1;
585 while drained < CONSUMER_DRAIN_BATCH {
586 let Some(item) = self.shared.buffer.pop() else {
587 break;
588 };
589 self.pending.push_back(item);
590 drained += 1;
591 }
592 self.shared.release_slots(drained);
593 Some(first)
594 }
595
596 fn finish(&mut self) {
597 if self.active {
598 self.active = false;
599 self.shared.release_consumer();
600 }
601 }
602}
603
604impl<T> Drop for ChannelStream<T> {
605 fn drop(&mut self) {
606 if self.active {
607 self.shared.close();
608 self.shared.release_consumer();
609 self.active = false;
610 }
611 }
612}
613
614impl<T> fmt::Debug for Channel<T> {
615 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616 f.debug_struct("Channel")
617 .field("closed", &self.is_closed())
618 .finish_non_exhaustive()
619 }
620}
621
622impl<T> fmt::Display for SendError<T> {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 match self {
625 SendError::Closed(_) => f.write_str("channel is closed"),
626 }
627 }
628}
629
630impl<T> fmt::Display for TrySendError<T> {
631 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
632 match self {
633 TrySendError::Full(_) => f.write_str("channel is full"),
634 TrySendError::Closed(_) => f.write_str("channel is closed"),
635 }
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use crate::stream::Materializer;
643 use futures::executor::block_on;
644 use std::{
645 collections::HashMap,
646 sync::{
647 Arc,
648 atomic::{AtomicBool, AtomicUsize, Ordering},
649 },
650 thread,
651 time::{Duration, Instant},
652 };
653
654 #[test]
655 fn try_send_reports_full_and_closed() {
656 let channel = Channel::bounded(1);
657 let mut stream = materialize_channel(&channel);
658
659 assert_eq!(channel.try_send(1), Ok(()));
660 assert_eq!(channel.try_send(2), Err(TrySendError::Full(2)));
661 assert_eq!(stream.next(), Some(Ok(1)));
662 assert_eq!(channel.try_send(3), Ok(()));
663 channel.close();
664 assert_eq!(channel.try_send(4), Err(TrySendError::Closed(4)));
665 assert_eq!(stream.next(), Some(Ok(3)));
666 assert_eq!(stream.next(), None);
667 }
668
669 #[test]
670 fn send_many_producers_preserves_per_producer_order() {
671 const PRODUCERS: usize = 8;
672 const PER_PRODUCER: usize = 256;
673
674 let channel = Channel::bounded(32);
675 let stream = materialize_channel(&channel);
676 let consumer = thread::spawn(move || {
677 let mut collected = Vec::new();
678 for item in stream {
679 collected.push(item.expect("channel has no failure terminal"));
680 }
681 collected
682 });
683
684 let mut handles = Vec::new();
685 for producer in 0..PRODUCERS {
686 let channel = channel.clone();
687 handles.push(thread::spawn(move || {
688 for seq in 0..PER_PRODUCER {
689 block_on(channel.send((producer, seq))).unwrap();
690 }
691 }));
692 }
693 for handle in handles {
694 handle.join().unwrap();
695 }
696 channel.close();
697
698 let collected = consumer.join().unwrap();
699 assert_eq!(collected.len(), PRODUCERS * PER_PRODUCER);
700
701 let mut by_producer: HashMap<usize, Vec<usize>> = HashMap::new();
702 for (producer, seq) in collected {
703 by_producer.entry(producer).or_default().push(seq);
704 }
705 for producer in 0..PRODUCERS {
706 assert_eq!(
707 by_producer.remove(&producer).unwrap(),
708 (0..PER_PRODUCER).collect::<Vec<_>>()
709 );
710 }
711 }
712
713 #[test]
714 fn try_send_under_contention_counts_all_accepted_elements() {
715 const PRODUCERS: usize = 8;
716 const PER_PRODUCER: usize = 128;
717 let total = PRODUCERS * PER_PRODUCER;
718
719 let channel = Channel::bounded(total);
720 let mut handles = Vec::new();
721 for producer in 0..PRODUCERS {
722 let channel = channel.clone();
723 handles.push(thread::spawn(move || {
724 for seq in 0..PER_PRODUCER {
725 channel.try_send((producer, seq)).unwrap();
726 }
727 }));
728 }
729 for handle in handles {
730 handle.join().unwrap();
731 }
732 channel.close();
733
734 let stream = materialize_channel(&channel);
735 let mut count = 0;
736 for item in stream {
737 item.unwrap();
738 count += 1;
739 }
740 assert_eq!(count, total);
741 }
742
743 #[test]
744 fn send_backpressure_parks_and_resumes_on_consume() {
745 let channel = Channel::bounded(1);
746 let mut stream = materialize_channel(&channel);
747 block_on(channel.send(1)).unwrap();
748
749 let completed = Arc::new(AtomicBool::new(false));
750 let send_completed = Arc::clone(&completed);
751 let sender = {
752 let channel = channel.clone();
753 thread::spawn(move || {
754 let result = block_on(channel.send(2));
755 send_completed.store(true, Ordering::SeqCst);
756 result
757 })
758 };
759
760 wait_until(Duration::from_secs(1), || {
761 channel.shared.space_waiters.load(Ordering::SeqCst) >= 1
762 });
763 assert!(!completed.load(Ordering::SeqCst));
764
765 assert_eq!(stream.next(), Some(Ok(1)));
766 assert_eq!(sender.join().unwrap(), Ok(()));
767 channel.close();
768 assert_eq!(stream.next(), Some(Ok(2)));
769 assert_eq!(stream.next(), None);
770 }
771
772 #[test]
773 fn close_drains_buffer_before_completion() {
774 let channel = Channel::bounded(3);
775 let mut stream = materialize_channel(&channel);
776 assert_eq!(channel.try_send(1), Ok(()));
777 assert_eq!(channel.try_send(2), Ok(()));
778 assert_eq!(channel.try_send(3), Ok(()));
779 channel.close();
780
781 assert_eq!(stream.next(), Some(Ok(1)));
782 assert_eq!(stream.next(), Some(Ok(2)));
783 assert_eq!(stream.next(), Some(Ok(3)));
784 assert_eq!(stream.next(), None);
785 assert_eq!(stream.next(), None);
786 }
787
788 #[test]
789 fn concurrent_close_vs_send_never_loses_accepted_elements() {
790 const ROUNDS: usize = 20;
791 const PRODUCERS: usize = 8;
792 const PER_PRODUCER: usize = 200;
793
794 for _ in 0..ROUNDS {
795 let channel = Channel::bounded(4);
796 let stream = materialize_channel(&channel);
797 let consumer = thread::spawn(move || {
798 let mut count = 0_usize;
799 for item in stream {
800 item.unwrap();
801 count += 1;
802 }
803 count
804 });
805
806 let mut handles = Vec::new();
807 let started = Arc::new(AtomicUsize::new(0));
808 for producer in 0..PRODUCERS {
809 let channel = channel.clone();
810 let started = Arc::clone(&started);
811 handles.push(thread::spawn(move || {
812 let mut accepted = 0_usize;
813 started.fetch_add(1, Ordering::SeqCst);
814 for seq in 0..PER_PRODUCER {
815 if block_on(channel.send((producer, seq))).is_ok() {
816 accepted += 1;
817 } else {
818 break;
819 }
820 }
821 accepted
822 }));
823 }
824
825 wait_until(Duration::from_secs(1), || {
826 started.load(Ordering::SeqCst) == PRODUCERS
827 });
828 channel.close();
829
830 let accepted: usize = handles
831 .into_iter()
832 .map(|handle| handle.join().unwrap())
833 .sum();
834 let delivered = consumer.join().unwrap();
835 assert_eq!(delivered, accepted);
836 assert_eq!(
837 channel.try_send((usize::MAX, usize::MAX)),
838 Err(TrySendError::Closed((usize::MAX, usize::MAX)))
839 );
840 }
841 }
842
843 #[test]
844 fn closed_future_wakes_on_close() {
845 let channel = Channel::<u64>::bounded(1);
846 let waiting = Arc::new(AtomicBool::new(false));
847 let waiter_started = Arc::clone(&waiting);
848 let waiter = {
849 let channel = channel.clone();
850 thread::spawn(move || {
851 waiter_started.store(true, Ordering::SeqCst);
852 block_on(channel.closed());
853 })
854 };
855
856 wait_until(Duration::from_secs(1), || waiting.load(Ordering::SeqCst));
857 channel.close();
858 waiter.join().unwrap();
859 }
860
861 #[test]
862 fn consumer_drop_closes_channel_and_wakes_blocked_producers() {
863 let channel = Channel::bounded(1);
864 let stream = materialize_channel(&channel);
865 block_on(channel.send(1)).unwrap();
866
867 let sender = {
868 let channel = channel.clone();
869 thread::spawn(move || block_on(channel.send(2)))
870 };
871
872 wait_until(Duration::from_secs(1), || {
873 channel.shared.space_waiters.load(Ordering::SeqCst) >= 1
874 });
875 drop(stream);
876
877 assert_eq!(sender.join().unwrap(), Err(SendError::Closed(2)));
878 assert!(channel.is_closed());
879 assert_eq!(channel.try_send(3), Err(TrySendError::Closed(3)));
880
881 let replacement = Channel::bounded(1);
882 let mut replacement_stream = materialize_channel(&replacement);
883 block_on(replacement.send(10)).unwrap();
884 replacement.close();
885 assert_eq!(replacement_stream.next(), Some(Ok(10)));
886 assert_eq!(replacement_stream.next(), None);
887 }
888
889 #[test]
890 fn single_active_consumer_is_enforced() {
891 let materializer = Materializer::new();
892 let channel = Channel::<i32>::bounded(1);
893 let source = channel.source();
894 let (_first, _) = Arc::clone(&source.factory).create(&materializer).unwrap();
895
896 let second = Arc::clone(&source.factory).create(&materializer);
897 assert!(
898 matches!(second, Err(StreamError::Failed(message)) if message.contains("active consumer"))
899 );
900 }
901
902 #[test]
903 fn capacity_one_ping_pong_preserves_order() {
904 const ITEMS: usize = 500;
905 let channel = Channel::bounded(1);
906 let stream = materialize_channel(&channel);
907 let consumer = thread::spawn(move || {
908 let mut got = Vec::new();
909 for item in stream {
910 got.push(item.unwrap());
911 }
912 got
913 });
914
915 for item in 0..ITEMS {
916 block_on(channel.send(item)).unwrap();
917 }
918 channel.close();
919
920 assert_eq!(consumer.join().unwrap(), (0..ITEMS).collect::<Vec<_>>());
921 }
922
923 #[test]
924 #[should_panic(expected = "channel capacity must be greater than zero")]
925 fn zero_capacity_is_unsupported() {
926 let _ = Channel::<i32>::bounded(0);
927 }
928
929 fn materialize_channel<T: Send + 'static>(channel: &Channel<T>) -> BoxStream<T> {
930 let materializer = Materializer::new();
931 let (stream, _) = channel.source().factory.create(&materializer).unwrap();
932 stream
933 }
934
935 fn wait_until(timeout: Duration, condition: impl Fn() -> bool) {
936 let deadline = Instant::now() + timeout;
937 while Instant::now() < deadline {
938 if condition() {
939 return;
940 }
941 thread::yield_now();
942 thread::sleep(Duration::from_millis(1));
943 }
944 assert!(condition(), "condition was not met within {timeout:?}");
945 }
946}