1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, AtomicUsize, Ordering};
23use std::sync::Arc;
24use std::thread;
25use std::time::{Duration, Instant};
26
27use crate::tpc::CachePadded;
28
29use super::config::{BackpressureStrategy, ChannelConfig, ChannelStats, WaitStrategy};
30use super::error::{RecvError, StreamingError, TryPushError};
31use super::ring_buffer::RingBuffer;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35#[repr(u8)]
36pub enum ChannelMode {
37 Spsc = 0,
39 Mpsc = 1,
41}
42
43impl From<u8> for ChannelMode {
44 fn from(v: u8) -> Self {
45 match v {
46 0 => Self::Spsc,
47 _ => Self::Mpsc,
48 }
49 }
50}
51
52struct ChannelInner<T> {
54 buffer: RingBuffer<T>,
56
57 mode: AtomicU8,
59
60 producer_count: AtomicUsize,
62
63 closed: AtomicBool,
65
66 config: ChannelConfig,
68
69 stats: ChannelStatsInner,
71
72 mpsc_lock: AtomicU8,
76}
77
78struct ChannelStatsInner {
83 items_pushed: CachePadded<AtomicU64>,
85 items_popped: CachePadded<AtomicU64>,
87 push_blocked: AtomicU64,
89 items_dropped: AtomicU64,
90 pop_empty: AtomicU64,
92}
93
94impl ChannelStatsInner {
95 fn new() -> Self {
96 Self {
97 items_pushed: CachePadded::new(AtomicU64::new(0)),
98 items_popped: CachePadded::new(AtomicU64::new(0)),
99 push_blocked: AtomicU64::new(0),
100 items_dropped: AtomicU64::new(0),
101 pop_empty: AtomicU64::new(0),
102 }
103 }
104
105 fn snapshot(&self) -> ChannelStats {
106 ChannelStats {
107 items_pushed: self.items_pushed.load(Ordering::Relaxed),
108 items_popped: self.items_popped.load(Ordering::Relaxed),
109 push_blocked: self.push_blocked.load(Ordering::Relaxed),
110 items_dropped: self.items_dropped.load(Ordering::Relaxed),
111 pop_empty: self.pop_empty.load(Ordering::Relaxed),
112 }
113 }
114}
115
116impl<T> ChannelInner<T> {
117 fn new(config: ChannelConfig) -> Self {
118 Self {
119 buffer: RingBuffer::new(config.effective_buffer_size()),
120 mode: AtomicU8::new(ChannelMode::Spsc as u8),
121 producer_count: AtomicUsize::new(1),
122 closed: AtomicBool::new(false),
123 config,
124 stats: ChannelStatsInner::new(),
125 mpsc_lock: AtomicU8::new(0),
126 }
127 }
128
129 #[inline]
136 fn acquire_mpsc_lock(&self) {
137 let mut attempts = 0u32;
138
139 while self
140 .mpsc_lock
141 .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed)
142 .is_err()
143 {
144 attempts = attempts.saturating_add(1);
145
146 if attempts <= 4 {
147 std::hint::spin_loop();
149 } else if attempts <= 8 {
150 thread::yield_now();
152 } else {
153 let sleep_us = (1 << (attempts - 8).min(6)).min(100);
156 thread::sleep(Duration::from_micros(sleep_us));
157 }
158 }
159 }
160
161 #[inline]
163 fn release_mpsc_lock(&self) {
164 self.mpsc_lock.store(0, Ordering::Release);
165 }
166
167 #[inline]
168 fn mode(&self) -> ChannelMode {
169 ChannelMode::from(self.mode.load(Ordering::Acquire))
170 }
171
172 #[inline]
173 fn is_mpsc(&self) -> bool {
174 self.mode() == ChannelMode::Mpsc
175 }
176
177 #[inline]
178 fn upgrade_to_mpsc(&self) {
179 self.mode.store(ChannelMode::Mpsc as u8, Ordering::Release);
180 }
181
182 #[inline]
183 fn track_push(&self) {
184 if self.config.track_stats {
185 self.stats.items_pushed.fetch_add(1, Ordering::Relaxed);
186 }
187 }
188
189 #[inline]
190 fn track_push_blocked(&self) {
191 if self.config.track_stats {
192 self.stats.push_blocked.fetch_add(1, Ordering::Relaxed);
193 }
194 }
195
196 #[inline]
197 fn track_dropped(&self) {
198 if self.config.track_stats {
199 self.stats.items_dropped.fetch_add(1, Ordering::Relaxed);
200 }
201 }
202
203 #[inline]
204 fn track_pop(&self) {
205 if self.config.track_stats {
206 self.stats.items_popped.fetch_add(1, Ordering::Relaxed);
207 }
208 }
209
210 #[inline]
211 fn track_pop_empty(&self) {
212 if self.config.track_stats {
213 self.stats.pop_empty.fetch_add(1, Ordering::Relaxed);
214 }
215 }
216}
217
218pub struct Producer<T> {
222 inner: Arc<ChannelInner<T>>,
223}
224
225impl<T> Producer<T> {
226 pub fn push(&self, item: T) -> Result<(), StreamingError> {
238 if self.inner.closed.load(Ordering::Acquire) {
239 return Err(StreamingError::ChannelClosed);
240 }
241
242 match self.inner.config.backpressure {
243 BackpressureStrategy::Block => self.push_blocking(item),
244 BackpressureStrategy::DropOldest => self.push_drop_oldest(item),
245 BackpressureStrategy::Reject => self.push_reject(item),
246 }
247 }
248
249 pub fn try_push(&self, item: T) -> Result<(), TryPushError<T>> {
258 if self.inner.closed.load(Ordering::Acquire) {
259 return Err(TryPushError::closed(item));
260 }
261
262 if self.inner.is_mpsc() {
263 self.try_push_mpsc(item)
264 } else {
265 self.try_push_spsc(item)
266 }
267 }
268
269 pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
273 let mut count = 0;
274 for item in items {
275 if self.try_push(item).is_err() {
276 break;
277 }
278 count += 1;
279 }
280 count
281 }
282
283 #[inline]
285 #[must_use]
286 pub fn is_mpsc(&self) -> bool {
287 self.inner.is_mpsc()
288 }
289
290 #[inline]
292 #[must_use]
293 pub fn mode(&self) -> ChannelMode {
294 self.inner.mode()
295 }
296
297 #[inline]
299 #[must_use]
300 pub fn is_closed(&self) -> bool {
301 self.inner.closed.load(Ordering::Acquire)
302 }
303
304 #[inline]
306 #[must_use]
307 pub fn len(&self) -> usize {
308 self.inner.buffer.len()
309 }
310
311 #[inline]
313 #[must_use]
314 pub fn is_empty(&self) -> bool {
315 self.inner.buffer.is_empty()
316 }
317
318 #[inline]
320 #[must_use]
321 pub fn capacity(&self) -> usize {
322 self.inner.buffer.capacity()
323 }
324
325 #[must_use]
327 pub fn stats(&self) -> ChannelStats {
328 self.inner.stats.snapshot()
329 }
330
331 fn push_blocking(&self, mut item: T) -> Result<(), StreamingError> {
332 loop {
333 match self.try_push(item) {
334 Ok(()) => return Ok(()),
335 Err(e) if e.is_closed() => return Err(StreamingError::ChannelClosed),
336 Err(e) => {
337 self.inner.track_push_blocked();
338 item = e.into_inner();
339 self.wait_for_space();
340 }
341 }
342 }
343 }
344
345 fn push_drop_oldest(&self, item: T) -> Result<(), StreamingError> {
346 match self.try_push(item) {
348 Ok(()) => Ok(()),
349 Err(e) if e.is_closed() => Err(StreamingError::ChannelClosed),
350 Err(e) => {
351 self.inner.track_dropped();
355 drop(e.into_inner());
358 Ok(())
359 }
360 }
361 }
362
363 fn push_reject(&self, item: T) -> Result<(), StreamingError> {
364 match self.try_push(item) {
365 Ok(()) => Ok(()),
366 Err(e) if e.is_closed() => Err(StreamingError::ChannelClosed),
367 Err(_) => Err(StreamingError::ChannelFull),
368 }
369 }
370
371 #[inline]
372 fn try_push_spsc(&self, item: T) -> Result<(), TryPushError<T>> {
373 match self.inner.buffer.push(item) {
374 Ok(()) => {
375 self.inner.track_push();
376 Ok(())
377 }
378 Err(item) => Err(TryPushError::full(item)),
379 }
380 }
381
382 fn try_push_mpsc(&self, item: T) -> Result<(), TryPushError<T>> {
383 self.inner.acquire_mpsc_lock();
386
387 let result = match self.inner.buffer.push(item) {
388 Ok(()) => {
389 self.inner.track_push();
390 Ok(())
391 }
392 Err(item) => Err(TryPushError::full(item)),
393 };
394
395 self.inner.release_mpsc_lock();
396 result
397 }
398
399 fn wait_for_space(&self) {
400 match self.inner.config.wait_strategy {
401 WaitStrategy::Spin => {
402 while self.inner.buffer.is_full() {
403 std::hint::spin_loop();
404 }
405 }
406 WaitStrategy::SpinYield => {
407 let mut spins = 0;
408 while self.inner.buffer.is_full() {
409 if spins < 100 {
410 std::hint::spin_loop();
411 spins += 1;
412 } else {
413 thread::yield_now();
414 spins = 0;
415 }
416 }
417 }
418 WaitStrategy::Park => {
419 while self.inner.buffer.is_full() {
421 thread::park_timeout(Duration::from_micros(100));
422 }
423 }
424 }
425 }
426}
427
428impl<T> Clone for Producer<T> {
429 fn clone(&self) -> Self {
430 self.inner.upgrade_to_mpsc();
432
433 self.inner.producer_count.fetch_add(1, Ordering::AcqRel);
435
436 Self {
437 inner: Arc::clone(&self.inner),
438 }
439 }
440}
441
442impl<T> Drop for Producer<T> {
443 fn drop(&mut self) {
444 let prev = self.inner.producer_count.fetch_sub(1, Ordering::AcqRel);
445 if prev == 1 {
446 self.inner.closed.store(true, Ordering::Release);
448 }
449 }
450}
451
452impl<T: std::fmt::Debug> std::fmt::Debug for Producer<T> {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 f.debug_struct("Producer")
455 .field("mode", &self.mode())
456 .field("len", &self.len())
457 .field("capacity", &self.capacity())
458 .field("is_closed", &self.is_closed())
459 .finish()
460 }
461}
462
463pub struct Consumer<T> {
465 inner: Arc<ChannelInner<T>>,
466}
467
468impl<T> Consumer<T> {
469 #[inline]
473 #[must_use]
474 pub fn poll(&self) -> Option<T> {
475 if self.inner.is_mpsc() {
476 self.poll_mpsc()
477 } else {
478 self.poll_spsc()
479 }
480 }
481
482 pub fn recv(&self) -> Result<T, RecvError> {
489 loop {
490 if let Some(item) = self.poll() {
491 return Ok(item);
492 }
493
494 if self.is_disconnected() {
495 return Err(RecvError::Disconnected);
496 }
497
498 self.wait_for_item();
499 }
500 }
501
502 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvError> {
509 let deadline = Instant::now() + timeout;
510
511 loop {
512 if let Some(item) = self.poll() {
513 return Ok(item);
514 }
515
516 if self.is_disconnected() {
517 return Err(RecvError::Disconnected);
518 }
519
520 if Instant::now() >= deadline {
521 return Err(RecvError::Timeout);
522 }
523
524 self.wait_for_item_timeout(deadline);
525 }
526 }
527
528 #[cold]
538 #[must_use]
539 pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
540 let mut items = Vec::with_capacity(max_count.min(self.len()));
541 for _ in 0..max_count {
542 if let Some(item) = self.poll() {
543 items.push(item);
544 } else {
545 break;
546 }
547 }
548 items
549 }
550
551 #[inline]
570 pub fn pop_batch_into(&self, buffer: &mut Vec<T>, max_count: usize) -> usize {
571 let mut count = 0;
572 for _ in 0..max_count {
573 if let Some(item) = self.poll() {
574 buffer.push(item);
575 count += 1;
576 } else {
577 break;
578 }
579 }
580 count
581 }
582
583 #[inline]
587 pub fn pop_each<F>(&self, max_count: usize, f: F) -> usize
588 where
589 F: FnMut(T) -> bool,
590 {
591 self.inner.buffer.pop_each(max_count, f)
592 }
593
594 #[inline]
596 #[must_use]
597 pub fn is_disconnected(&self) -> bool {
598 self.inner.closed.load(Ordering::Acquire) && self.inner.buffer.is_empty()
599 }
600
601 #[inline]
603 #[must_use]
604 pub fn len(&self) -> usize {
605 self.inner.buffer.len()
606 }
607
608 #[inline]
610 #[must_use]
611 pub fn is_empty(&self) -> bool {
612 self.inner.buffer.is_empty()
613 }
614
615 #[inline]
617 #[must_use]
618 pub fn capacity(&self) -> usize {
619 self.inner.buffer.capacity()
620 }
621
622 #[inline]
624 #[must_use]
625 pub fn mode(&self) -> ChannelMode {
626 self.inner.mode()
627 }
628
629 #[must_use]
631 pub fn stats(&self) -> ChannelStats {
632 self.inner.stats.snapshot()
633 }
634
635 #[inline]
636 fn poll_spsc(&self) -> Option<T> {
637 let item = self.inner.buffer.pop();
638 if item.is_some() {
639 self.inner.track_pop();
640 } else {
641 self.inner.track_pop_empty();
642 }
643 item
644 }
645
646 fn poll_mpsc(&self) -> Option<T> {
647 let item = self.inner.buffer.pop();
650 if item.is_some() {
651 self.inner.track_pop();
652 } else {
653 self.inner.track_pop_empty();
654 }
655 item
656 }
657
658 fn wait_for_item(&self) {
659 match self.inner.config.wait_strategy {
660 WaitStrategy::Spin => {
661 while self.inner.buffer.is_empty() && !self.is_disconnected() {
662 std::hint::spin_loop();
663 }
664 }
665 WaitStrategy::SpinYield => {
666 let mut spins = 0;
667 while self.inner.buffer.is_empty() && !self.is_disconnected() {
668 if spins < 100 {
669 std::hint::spin_loop();
670 spins += 1;
671 } else {
672 thread::yield_now();
673 spins = 0;
674 }
675 }
676 }
677 WaitStrategy::Park => {
678 while self.inner.buffer.is_empty() && !self.is_disconnected() {
679 thread::park_timeout(Duration::from_micros(10));
680 }
681 }
682 }
683 }
684
685 fn wait_for_item_timeout(&self, deadline: Instant) {
686 match self.inner.config.wait_strategy {
687 WaitStrategy::Spin | WaitStrategy::SpinYield => {
688 }
690 WaitStrategy::Park => {
691 let remaining = deadline.saturating_duration_since(Instant::now());
692 if !remaining.is_zero() {
693 thread::park_timeout(remaining.min(Duration::from_micros(100)));
694 }
695 }
696 }
697 }
698}
699
700impl<T> Drop for Consumer<T> {
701 fn drop(&mut self) {
702 self.inner.closed.store(true, Ordering::Release);
705 }
706}
707
708impl<T: std::fmt::Debug> std::fmt::Debug for Consumer<T> {
709 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 f.debug_struct("Consumer")
711 .field("mode", &self.mode())
712 .field("len", &self.len())
713 .field("capacity", &self.capacity())
714 .field("is_disconnected", &self.is_disconnected())
715 .finish()
716 }
717}
718
719impl<T> Iterator for Consumer<T> {
721 type Item = T;
722
723 fn next(&mut self) -> Option<Self::Item> {
724 self.recv().ok()
725 }
726}
727
728#[must_use]
733pub fn channel<T>(buffer_size: usize) -> (Producer<T>, Consumer<T>) {
734 channel_with_config(ChannelConfig::with_buffer_size(buffer_size))
735}
736
737#[must_use]
739pub fn channel_with_config<T>(config: ChannelConfig) -> (Producer<T>, Consumer<T>) {
740 let inner = Arc::new(ChannelInner::new(config));
741
742 let producer = Producer {
743 inner: Arc::clone(&inner),
744 };
745
746 let consumer = Consumer { inner };
747
748 (producer, consumer)
749}
750
751#[cfg(test)]
752mod tests {
753 use super::*;
754
755 #[test]
756 fn test_basic_channel() {
757 let (producer, consumer) = channel::<i32>(16);
758
759 assert!(producer.try_push(1).is_ok());
760 assert!(producer.try_push(2).is_ok());
761 assert!(producer.try_push(3).is_ok());
762
763 assert_eq!(consumer.poll(), Some(1));
764 assert_eq!(consumer.poll(), Some(2));
765 assert_eq!(consumer.poll(), Some(3));
766 assert_eq!(consumer.poll(), None);
767 }
768
769 #[test]
770 fn test_channel_starts_spsc() {
771 let (producer, consumer) = channel::<i32>(16);
772
773 assert!(!producer.is_mpsc());
774 assert_eq!(producer.mode(), ChannelMode::Spsc);
775 assert_eq!(consumer.mode(), ChannelMode::Spsc);
776 }
777
778 #[test]
779 fn test_clone_upgrades_to_mpsc() {
780 let (producer, consumer) = channel::<i32>(16);
781
782 assert!(!producer.is_mpsc());
783
784 let producer2 = producer.clone();
785
786 assert!(producer.is_mpsc());
787 assert!(producer2.is_mpsc());
788 assert_eq!(consumer.mode(), ChannelMode::Mpsc);
789 }
790
791 #[test]
792 fn test_spsc_push_pop() {
793 let (producer, consumer) = channel::<i32>(8);
794
795 for i in 0..7 {
796 assert!(producer.try_push(i).is_ok());
797 }
798
799 assert!(producer.try_push(100).is_err());
801
802 for i in 0..7 {
803 assert_eq!(consumer.poll(), Some(i));
804 }
805
806 assert_eq!(consumer.poll(), None);
807 }
808
809 #[test]
810 fn test_mpsc_push_pop() {
811 let (producer, consumer) = channel::<i32>(16);
812
813 let producer2 = producer.clone();
814 assert!(producer.is_mpsc());
815
816 producer.try_push(1).unwrap();
817 producer2.try_push(2).unwrap();
818 producer.try_push(3).unwrap();
819
820 let mut items = Vec::new();
822 while let Some(item) = consumer.poll() {
823 items.push(item);
824 }
825
826 assert_eq!(items.len(), 3);
827 assert!(items.contains(&1));
828 assert!(items.contains(&2));
829 assert!(items.contains(&3));
830 }
831
832 #[test]
833 fn test_push_batch() {
834 let (producer, consumer) = channel::<i32>(16);
835
836 let count = producer.push_batch(vec![1, 2, 3, 4, 5]);
837 assert_eq!(count, 5);
838
839 let items = consumer.pop_batch(10);
840 assert_eq!(items, vec![1, 2, 3, 4, 5]);
841 }
842
843 #[test]
844 fn test_pop_each() {
845 let (producer, consumer) = channel::<i32>(16);
846
847 producer.push_batch(vec![1, 2, 3, 4, 5]);
848
849 let mut sum = 0;
850 let count = consumer.pop_each(10, |item| {
851 sum += item;
852 true
853 });
854
855 assert_eq!(count, 5);
856 assert_eq!(sum, 15);
857 }
858
859 #[test]
860 fn test_recv_timeout() {
861 let (producer, consumer) = channel::<i32>(16);
862
863 let result = consumer.recv_timeout(Duration::from_millis(10));
865 assert!(matches!(result, Err(RecvError::Timeout)));
866
867 producer.try_push(42).unwrap();
869 let result = consumer.recv_timeout(Duration::from_secs(1));
870 assert_eq!(result, Ok(42));
871 }
872
873 #[test]
874 fn test_disconnected_on_producer_drop() {
875 let (producer, consumer) = channel::<i32>(16);
876
877 producer.try_push(1).unwrap();
878 drop(producer);
879
880 assert_eq!(consumer.poll(), Some(1));
882
883 assert!(consumer.is_disconnected());
885 assert!(matches!(consumer.recv(), Err(RecvError::Disconnected)));
886 }
887
888 #[test]
889 fn test_closed_on_consumer_drop() {
890 let (producer, consumer) = channel::<i32>(16);
891
892 drop(consumer);
893
894 assert!(producer.is_closed());
896 assert!(matches!(
897 producer.push(1),
898 Err(StreamingError::ChannelClosed)
899 ));
900 }
901
902 #[test]
903 fn test_backpressure_reject() {
904 let config = ChannelConfig::builder()
905 .buffer_size(4)
906 .backpressure(BackpressureStrategy::Reject)
907 .build();
908
909 let (producer, consumer) = channel_with_config::<i32>(config);
910
911 assert!(producer.push(1).is_ok());
913 assert!(producer.push(2).is_ok());
914 assert!(producer.push(3).is_ok());
915
916 assert!(matches!(producer.push(4), Err(StreamingError::ChannelFull)));
918
919 let _ = consumer.poll();
921
922 assert!(producer.push(4).is_ok());
924 }
925
926 #[test]
927 fn test_backpressure_drop_oldest() {
928 let config = ChannelConfig::builder()
929 .buffer_size(4)
930 .backpressure(BackpressureStrategy::DropOldest)
931 .track_stats(true)
932 .build();
933
934 let (producer, _consumer) = channel_with_config::<i32>(config);
935
936 producer.push(1).unwrap();
938 producer.push(2).unwrap();
939 producer.push(3).unwrap();
940
941 let result = producer.push(4);
943 assert!(result.is_ok());
944
945 let stats = producer.stats();
947 assert!(stats.items_dropped > 0);
948 }
949
950 #[test]
951 fn test_stats_tracking() {
952 let config = ChannelConfig::builder()
953 .buffer_size(16)
954 .track_stats(true)
955 .build();
956
957 let (producer, consumer) = channel_with_config::<i32>(config);
958
959 producer.push_batch(vec![1, 2, 3, 4, 5]);
960 let _ = consumer.pop_batch(3);
961 let _ = consumer.poll();
963 let _ = consumer.poll();
964 let _ = consumer.poll();
966 let _ = consumer.poll();
967
968 let stats = producer.stats();
969 assert_eq!(stats.items_pushed, 5);
970 assert_eq!(stats.items_popped, 5); assert!(stats.pop_empty >= 2); }
973
974 #[test]
975 fn test_concurrent_spsc() {
976 const ITEMS: i32 = 10_000;
977 let (producer, consumer) = channel::<i32>(1024);
978
979 let producer_handle = thread::spawn(move || {
980 for i in 0..ITEMS {
981 while producer.try_push(i).is_err() {
982 thread::yield_now();
983 }
984 }
985 });
986
987 let consumer_handle = thread::spawn(move || {
988 let mut received = Vec::with_capacity(ITEMS as usize);
989 while received.len() < ITEMS as usize {
990 if let Some(item) = consumer.poll() {
991 received.push(item);
992 } else {
993 thread::yield_now();
994 }
995 }
996 received
997 });
998
999 producer_handle.join().unwrap();
1000 let received = consumer_handle.join().unwrap();
1001
1002 assert_eq!(received.len(), ITEMS as usize);
1003 for (i, &item) in received.iter().enumerate() {
1004 assert_eq!(item, i32::try_from(i).unwrap());
1005 }
1006 }
1007
1008 #[test]
1009 fn test_concurrent_mpsc() {
1010 const ITEMS_PER_PRODUCER: i32 = 1000;
1011 const NUM_PRODUCERS: usize = 4;
1012
1013 let (producer, consumer) = channel::<i32>(1024);
1014
1015 let mut handles = Vec::new();
1016
1017 for id in 0..NUM_PRODUCERS {
1018 let p = producer.clone();
1019 handles.push(thread::spawn(move || {
1020 for i in 0..ITEMS_PER_PRODUCER {
1021 let value = i32::try_from(id).unwrap() * ITEMS_PER_PRODUCER + i;
1022 while p.try_push(value).is_err() {
1023 thread::yield_now();
1024 }
1025 }
1026 }));
1027 }
1028
1029 drop(producer); let consumer_handle = thread::spawn(move || {
1032 let mut received = Vec::new();
1033 let expected = NUM_PRODUCERS * ITEMS_PER_PRODUCER as usize;
1034 while received.len() < expected {
1035 if let Some(item) = consumer.poll() {
1036 received.push(item);
1037 } else if consumer.is_disconnected() {
1038 break;
1039 } else {
1040 thread::yield_now();
1041 }
1042 }
1043 received
1044 });
1045
1046 for h in handles {
1047 h.join().unwrap();
1048 }
1049
1050 let received = consumer_handle.join().unwrap();
1051 assert_eq!(received.len(), NUM_PRODUCERS * ITEMS_PER_PRODUCER as usize);
1052 }
1053
1054 #[test]
1055 fn test_len_and_capacity() {
1056 let (producer, consumer) = channel::<i32>(16);
1057
1058 assert_eq!(producer.capacity(), 16);
1059 assert_eq!(consumer.capacity(), 16);
1060 assert!(producer.is_empty());
1061 assert!(consumer.is_empty());
1062
1063 producer.push_batch(vec![1, 2, 3]);
1064
1065 assert_eq!(producer.len(), 3);
1066 assert_eq!(consumer.len(), 3);
1067 assert!(!producer.is_empty());
1068 }
1069
1070 #[test]
1071 fn test_consumer_iterator() {
1072 let (producer, mut consumer) = channel::<i32>(16);
1073
1074 producer.push_batch(vec![1, 2, 3]);
1075 drop(producer);
1076
1077 let items: Vec<i32> = consumer.by_ref().collect();
1078 assert_eq!(items, vec![1, 2, 3]);
1079 }
1080
1081 #[test]
1082 fn test_debug_formatting() {
1083 let (producer, consumer) = channel::<i32>(16);
1084
1085 let producer_debug = format!("{producer:?}");
1086 assert!(producer_debug.contains("Producer"));
1087 assert!(producer_debug.contains("Spsc"));
1088
1089 let consumer_debug = format!("{consumer:?}");
1090 assert!(consumer_debug.contains("Consumer"));
1091 }
1092}