1use crate::shim::atomic::{AtomicBool, Ordering};
39use crate::shim::cell::UnsafeCell;
40use crate::shim::notify::SingleWaiterNotify;
41use crate::shim::sync::Arc;
42use core::num::NonZeroUsize;
43use smallring::spsc::{Consumer, PopError, Producer, PushError, new};
44
45pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
96 let (producer, consumer) = new::<T, N>(capacity);
97
98 let inner = Arc::new(Inner::<T, N> {
99 producer: UnsafeCell::new(producer),
100 consumer: UnsafeCell::new(consumer),
101 closed: AtomicBool::new(false),
102 recv_notify: SingleWaiterNotify::new(),
103 send_notify: SingleWaiterNotify::new(),
104 });
105
106 let sender = Sender {
107 inner: inner.clone(),
108 };
109
110 let receiver = Receiver { inner };
111
112 (sender, receiver)
113}
114
115struct Inner<T, const N: usize = 32> {
125 producer: UnsafeCell<Producer<T, N>>,
129
130 consumer: UnsafeCell<Consumer<T, N>>,
134
135 closed: AtomicBool,
139
140 recv_notify: SingleWaiterNotify,
144
145 send_notify: SingleWaiterNotify,
149}
150
151unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159impl<T, const N: usize> core::fmt::Debug for Inner<T, N> {
160 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
161 f.debug_struct("Inner")
162 .field("closed", &self.closed.load(Ordering::Acquire))
163 .finish()
164 }
165}
166
167pub struct Sender<T, const N: usize> {
171 inner: Arc<Inner<T, N>>,
172}
173
174impl<T, const N: usize> core::fmt::Debug for Sender<T, N> {
175 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
176 f.debug_struct("Sender")
177 .field("closed", &self.is_closed())
178 .field("len", &self.len())
179 .field("capacity", &self.capacity())
180 .finish()
181 }
182}
183
184pub struct Receiver<T, const N: usize> {
188 inner: Arc<Inner<T, N>>,
189}
190
191impl<T, const N: usize> core::fmt::Debug for Receiver<T, N> {
192 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
193 f.debug_struct("Receiver")
194 .field("is_empty", &self.is_empty())
195 .field("len", &self.len())
196 .field("capacity", &self.capacity())
197 .finish()
198 }
199}
200
201pub struct Drain<'a, T, const N: usize> {
217 receiver: &'a mut Receiver<T, N>,
218}
219
220impl<'a, T, const N: usize> core::fmt::Debug for Drain<'a, T, N> {
221 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
222 f.debug_struct("Drain")
223 .field("len", &self.receiver.len())
224 .field("is_empty", &self.receiver.is_empty())
225 .finish()
226 }
227}
228
229impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
230 type Item = T;
231
232 #[inline]
233 fn next(&mut self) -> Option<Self::Item> {
234 self.receiver.try_recv().ok()
235 }
236
237 #[inline]
238 fn size_hint(&self) -> (usize, Option<usize>) {
239 let len = self.receiver.len();
240 (len, Some(len))
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
248pub enum SendError<T> {
249 Closed(T),
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum TryRecvError {
260 Empty,
264
265 Closed,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum TrySendError<T> {
276 Full(T),
280
281 Closed(T),
285}
286
287impl<T, const N: usize> Sender<T, N> {
288 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
298 loop {
299 match self.try_send(value) {
300 Ok(()) => return Ok(()),
301 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
302 Err(TrySendError::Full(v)) => {
303 value = v;
306
307 self.inner.send_notify.notified().await;
310
311 if self.inner.closed.load(Ordering::Acquire) {
314 return Err(SendError::Closed(value));
315 }
316
317 }
320 }
321 }
322 }
323
324 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
336 if self.inner.closed.load(Ordering::Acquire) {
339 return Err(TrySendError::Closed(value));
340 }
341
342 self.inner.producer.with_mut(|producer| unsafe {
345 match (*producer).push(value) {
346 Ok(()) => {
347 self.inner.recv_notify.notify_one();
350 Ok(())
351 }
352 Err(PushError::Full(v)) => Err(TrySendError::Full(v)),
353 }
354 })
355 }
356
357 #[inline]
361 pub fn is_closed(&self) -> bool {
362 self.inner.closed.load(Ordering::Acquire)
363 }
364
365 #[inline]
369 pub fn capacity(&self) -> usize {
370 self.inner
373 .producer
374 .with(|producer| unsafe { (*producer).capacity() })
375 }
376
377 #[inline]
381 pub fn len(&self) -> usize {
382 self.inner
385 .producer
386 .with(|producer| unsafe { (*producer).slots() })
387 }
388
389 #[inline]
390 pub fn is_empty(&self) -> bool {
391 self.inner
394 .producer
395 .with(|producer| unsafe { (*producer).is_empty() })
396 }
397
398 #[inline]
402 pub fn free_slots(&self) -> usize {
403 self.inner
406 .producer
407 .with(|producer| unsafe { (*producer).free_slots() })
408 }
409
410 #[inline]
414 pub fn is_full(&self) -> bool {
415 self.inner
418 .producer
419 .with(|producer| unsafe { (*producer).is_full() })
420 }
421}
422
423impl<T: Copy, const N: usize> Sender<T, N> {
424 pub fn try_send_slice(&self, values: &[T]) -> usize {
446 if self.inner.closed.load(Ordering::Acquire) {
449 return 0;
450 }
451
452 self.inner.producer.with_mut(|producer| unsafe {
455 let sent = (*producer).push_slice(values);
456
457 if sent > 0 {
458 self.inner.recv_notify.notify_one();
461 }
462
463 sent
464 })
465 }
466
467 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
492 let mut total_sent = 0;
493
494 while total_sent < values.len() {
495 if self.inner.closed.load(Ordering::Acquire) {
498 return Err(SendError::Closed(()));
499 }
500
501 let sent = self.try_send_slice(&values[total_sent..]);
502 total_sent += sent;
503
504 if total_sent < values.len() {
505 self.inner.send_notify.notified().await;
508
509 if self.inner.closed.load(Ordering::Acquire) {
512 return Err(SendError::Closed(()));
513 }
514 }
515 }
516
517 Ok(total_sent)
518 }
519}
520
521impl<T, const N: usize> Receiver<T, N> {
522 pub async fn recv(&self) -> Option<T> {
530 loop {
531 match self.try_recv() {
532 Ok(value) => return Some(value),
533 Err(TryRecvError::Closed) => return None,
534 Err(TryRecvError::Empty) => {
535 if self.inner.closed.load(Ordering::Acquire) {
538 if let Ok(value) = self.try_recv() {
541 return Some(value);
542 }
543 return None;
544 }
545
546 self.inner.recv_notify.notified().await;
549 }
550 }
551 }
552 }
553
554 pub fn try_recv(&self) -> Result<T, TryRecvError> {
566 self.inner.consumer.with_mut(|consumer| unsafe {
569 match (*consumer).pop() {
570 Ok(value) => {
571 self.inner.send_notify.notify_one();
574 Ok(value)
575 }
576 Err(PopError::Empty) => {
577 if self.inner.closed.load(Ordering::Acquire) {
580 match (*consumer).pop() {
584 Ok(value) => {
585 self.inner.send_notify.notify_one();
586 Ok(value)
587 }
588 Err(PopError::Empty) => Err(TryRecvError::Closed),
589 }
590 } else {
591 Err(TryRecvError::Empty)
592 }
593 }
594 }
595 })
596 }
597
598 #[inline]
602 pub fn is_empty(&self) -> bool {
603 self.inner
606 .consumer
607 .with(|consumer| unsafe { (*consumer).is_empty() })
608 }
609
610 #[inline]
614 pub fn len(&self) -> usize {
615 self.inner
618 .consumer
619 .with(|consumer| unsafe { (*consumer).slots() })
620 }
621
622 #[inline]
626 pub fn capacity(&self) -> usize {
627 self.inner
630 .consumer
631 .with(|consumer| unsafe { (*consumer).buffer().capacity() })
632 }
633
634 #[inline]
651 pub fn peek(&self) -> Option<&T> {
652 self.inner
655 .consumer
656 .with(|consumer| unsafe { core::mem::transmute((*consumer).peek()) })
657 }
658
659 pub fn clear(&mut self) {
667 self.inner
670 .consumer
671 .with_mut(|consumer| unsafe { (*consumer).clear() });
672
673 self.inner.send_notify.notify_one();
676 }
677
678 #[inline]
710 pub fn drain(&mut self) -> Drain<'_, T, N> {
711 Drain { receiver: self }
712 }
713}
714
715impl<T: Copy, const N: usize> Receiver<T, N> {
716 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
738 self.inner.consumer.with_mut(|consumer| unsafe {
741 let received = (*consumer).pop_slice(dest);
742
743 if received > 0 {
744 self.inner.send_notify.notify_one();
747 }
748
749 received
750 })
751 }
752
753 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
777 let mut total_received = 0;
778
779 while total_received < dest.len() {
780 let received = self.try_recv_slice(&mut dest[total_received..]);
781 total_received += received;
782
783 if total_received < dest.len() {
784 if self.inner.closed.load(Ordering::Acquire) {
787 return total_received;
790 }
791
792 self.inner.recv_notify.notified().await;
795
796 if self.inner.closed.load(Ordering::Acquire) {
799 let final_received = self.try_recv_slice(&mut dest[total_received..]);
802 total_received += final_received;
803 return total_received;
804 }
805 }
806 }
807
808 total_received
809 }
810}
811
812impl<T, const N: usize> Drop for Receiver<T, N> {
813 fn drop(&mut self) {
814 self.inner.closed.store(true, Ordering::Release);
817
818 self.inner.send_notify.notify_one();
821 }
822}
823
824impl<T, const N: usize> Drop for Sender<T, N> {
825 fn drop(&mut self) {
826 self.inner.closed.store(true, Ordering::Release);
829
830 self.inner.recv_notify.notify_one();
833 }
834}
835
836#[cfg(all(test, not(feature = "loom")))]
837mod tests {
838 use super::*;
839
840 #[tokio::test]
841 async fn test_basic_send_recv() {
842 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
843
844 tx.send(1).await.unwrap();
845 tx.send(2).await.unwrap();
846 tx.send(3).await.unwrap();
847
848 assert_eq!(rx.recv().await, Some(1));
849 assert_eq!(rx.recv().await, Some(2));
850 assert_eq!(rx.recv().await, Some(3));
851 }
852
853 #[tokio::test]
854 async fn test_try_send_recv() {
855 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
856
857 tx.try_send(1).unwrap();
858 tx.try_send(2).unwrap();
859
860 assert_eq!(rx.try_recv().unwrap(), 1);
861 assert_eq!(rx.try_recv().unwrap(), 2);
862 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
863 }
864
865 #[tokio::test]
866 async fn test_channel_closed_on_sender_drop() {
867 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
868
869 tx.send(1).await.unwrap();
870 drop(tx);
871
872 assert_eq!(rx.recv().await, Some(1));
873 assert_eq!(rx.recv().await, None);
874 }
875
876 #[tokio::test]
877 async fn test_channel_closed_on_receiver_drop() {
878 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
879
880 drop(rx);
881
882 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
883 }
884
885 #[tokio::test]
886 async fn test_cross_task_communication() {
887 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
888
889 let sender_handle = tokio::spawn(async move {
890 for i in 0..10 {
891 tx.send(i).await.unwrap();
892 }
893 });
894
895 let receiver_handle = tokio::spawn(async move {
896 let mut sum = 0;
897 while let Some(value) = rx.recv().await {
898 sum += value;
899 }
900 sum
901 });
902
903 sender_handle.await.unwrap();
904 let sum = receiver_handle.await.unwrap();
905 assert_eq!(sum, 45); }
907
908 #[tokio::test]
909 async fn test_backpressure() {
910 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
911
912 tx.try_send(1).unwrap();
914 tx.try_send(2).unwrap();
915 tx.try_send(3).unwrap();
916 tx.try_send(4).unwrap();
917
918 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
920
921 let send_handle = tokio::spawn(async move {
923 tx.send(5).await.unwrap();
924 tx.send(6).await.unwrap();
925 });
926
927 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
928
929 assert_eq!(rx.recv().await, Some(1));
930 assert_eq!(rx.recv().await, Some(2));
931 assert_eq!(rx.recv().await, Some(3));
932 assert_eq!(rx.recv().await, Some(4));
933 assert_eq!(rx.recv().await, Some(5));
934 assert_eq!(rx.recv().await, Some(6));
935
936 send_handle.await.unwrap();
937 }
938
939 #[tokio::test]
940 async fn test_capacity_and_len() {
941 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
942
943 assert_eq!(rx.capacity(), 8);
944 assert_eq!(rx.len(), 0);
945 assert!(rx.is_empty());
946
947 tx.try_send(1).unwrap();
948 tx.try_send(2).unwrap();
949
950 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
951 assert_eq!(rx.len(), 2);
952 assert!(!rx.is_empty());
953 }
954
955 #[tokio::test]
958 async fn test_sender_capacity_queries() {
959 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
960
961 assert_eq!(tx.capacity(), 8);
962 assert_eq!(tx.len(), 0);
963 assert_eq!(tx.free_slots(), 8);
964 assert!(!tx.is_full());
965
966 tx.try_send(1).unwrap();
967 tx.try_send(2).unwrap();
968 tx.try_send(3).unwrap();
969
970 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
971 assert_eq!(tx.len(), 3);
972 assert_eq!(tx.free_slots(), 5);
973 assert!(!tx.is_full());
974
975 tx.try_send(4).unwrap();
977 tx.try_send(5).unwrap();
978 tx.try_send(6).unwrap();
979 tx.try_send(7).unwrap();
980 tx.try_send(8).unwrap();
981
982 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
983 assert_eq!(tx.len(), 8);
984 assert_eq!(tx.free_slots(), 0);
985 assert!(tx.is_full());
986
987 rx.recv().await;
989 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
990
991 assert_eq!(tx.len(), 7);
992 assert_eq!(tx.free_slots(), 1);
993 assert!(!tx.is_full());
994 }
995
996 #[tokio::test]
997 async fn test_try_send_slice() {
998 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
999
1000 let data = [1, 2, 3, 4, 5];
1001 let sent = tx.try_send_slice(&data);
1002
1003 assert_eq!(sent, 5);
1004 assert_eq!(rx.len(), 5);
1005
1006 for i in 0..5 {
1007 assert_eq!(rx.recv().await.unwrap(), data[i]);
1008 }
1009 }
1010
1011 #[tokio::test]
1012 async fn test_try_send_slice_partial() {
1013 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1014
1015 let initial = [1, 2, 3, 4, 5];
1017 tx.try_send_slice(&initial);
1018
1019 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1021 let sent = tx.try_send_slice(&more);
1022
1023 assert_eq!(sent, 3);
1024 assert_eq!(rx.len(), 8);
1025 assert!(tx.is_full());
1026
1027 for i in 1..=8 {
1029 assert_eq!(rx.recv().await.unwrap(), i);
1030 }
1031 }
1032
1033 #[tokio::test]
1034 async fn test_send_slice() {
1035 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1036
1037 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1038 let result = tx.send_slice(&data).await;
1039
1040 assert_eq!(result.unwrap(), 10);
1041 assert_eq!(rx.len(), 10);
1042
1043 for i in 0..10 {
1044 assert_eq!(rx.recv().await.unwrap(), data[i]);
1045 }
1046 }
1047
1048 #[tokio::test]
1049 async fn test_send_slice_with_backpressure() {
1050 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1051
1052 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1053
1054 let send_handle = tokio::spawn(async move { tx.send_slice(&data).await.unwrap() });
1055
1056 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1057
1058 for i in 1..=4 {
1060 assert_eq!(rx.recv().await.unwrap(), i);
1061 }
1062
1063 let sent = send_handle.await.unwrap();
1064 assert_eq!(sent, 8);
1065
1066 for i in 5..=8 {
1068 assert_eq!(rx.recv().await.unwrap(), i);
1069 }
1070 }
1071
1072 #[tokio::test]
1073 async fn test_peek() {
1074 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1075
1076 assert!(rx.peek().is_none());
1078
1079 tx.try_send(42).unwrap();
1080 tx.try_send(100).unwrap();
1081 tx.try_send(200).unwrap();
1082
1083 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1084
1085 assert_eq!(rx.peek(), Some(&42));
1087 assert_eq!(rx.peek(), Some(&42)); assert_eq!(rx.len(), 3); }
1090
1091 #[tokio::test]
1092 async fn test_peek_after_recv() {
1093 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1094
1095 tx.try_send("first".to_string()).unwrap();
1096 tx.try_send("second".to_string()).unwrap();
1097 tx.try_send("third".to_string()).unwrap();
1098
1099 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1100
1101 assert_eq!(rx.peek(), Some(&"first".to_string()));
1102 rx.recv().await.unwrap();
1103
1104 assert_eq!(rx.peek(), Some(&"second".to_string()));
1105 rx.recv().await.unwrap();
1106
1107 assert_eq!(rx.peek(), Some(&"third".to_string()));
1108 rx.recv().await.unwrap();
1109
1110 assert!(rx.peek().is_none());
1111 }
1112
1113 #[tokio::test]
1114 async fn test_clear() {
1115 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1116
1117 for i in 0..10 {
1118 tx.try_send(i).unwrap();
1119 }
1120
1121 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1122 assert_eq!(rx.len(), 10);
1123
1124 rx.clear();
1125
1126 assert_eq!(rx.len(), 0);
1127 assert!(rx.is_empty());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_clear_with_drop() {
1132 use std::sync::Arc;
1133 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1134
1135 #[derive(Debug)]
1136 struct DropCounter {
1137 counter: Arc<AtomicUsize>,
1138 }
1139
1140 impl Drop for DropCounter {
1141 fn drop(&mut self) {
1142 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1143 }
1144 }
1145
1146 let counter = Arc::new(AtomicUsize::new(0));
1147
1148 {
1149 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1150
1151 for _ in 0..8 {
1152 tx.try_send(DropCounter {
1153 counter: counter.clone(),
1154 })
1155 .unwrap();
1156 }
1157
1158 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1159 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1160
1161 rx.clear();
1162
1163 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1164 }
1165 }
1166
1167 #[tokio::test]
1168 async fn test_drain() {
1169 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1170
1171 for i in 0..10 {
1172 tx.try_send(i).unwrap();
1173 }
1174
1175 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1176
1177 let collected: Vec<i32> = rx.drain().collect();
1178
1179 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1180 assert!(rx.is_empty());
1181 }
1182
1183 #[tokio::test]
1184 async fn test_drain_empty() {
1185 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1186
1187 let collected: Vec<i32> = rx.drain().collect();
1188
1189 assert!(collected.is_empty());
1190 }
1191
1192 #[tokio::test]
1193 async fn test_drain_size_hint() {
1194 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1195
1196 for i in 0..5 {
1197 tx.try_send(i).unwrap();
1198 }
1199
1200 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1201
1202 let mut drain = rx.drain();
1203
1204 assert_eq!(drain.size_hint(), (5, Some(5)));
1205
1206 drain.next();
1207 assert_eq!(drain.size_hint(), (4, Some(4)));
1208
1209 drain.next();
1210 assert_eq!(drain.size_hint(), (3, Some(3)));
1211 }
1212
1213 #[tokio::test]
1214 async fn test_try_recv_slice() {
1215 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1216
1217 for i in 0..10 {
1219 tx.try_send(i).unwrap();
1220 }
1221
1222 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1223
1224 let mut dest = [0u32; 5];
1225 let received = rx.try_recv_slice(&mut dest);
1226
1227 assert_eq!(received, 5);
1228 assert_eq!(dest, [0, 1, 2, 3, 4]);
1229 assert_eq!(rx.len(), 5);
1230 }
1231
1232 #[tokio::test]
1233 async fn test_try_recv_slice_partial() {
1234 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1235
1236 tx.try_send(1).unwrap();
1237 tx.try_send(2).unwrap();
1238 tx.try_send(3).unwrap();
1239
1240 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1241
1242 let mut dest = [0u32; 10];
1243 let received = rx.try_recv_slice(&mut dest);
1244
1245 assert_eq!(received, 3);
1246 assert_eq!(&dest[0..3], &[1, 2, 3]);
1247 assert!(rx.is_empty());
1248 }
1249
1250 #[tokio::test]
1251 async fn test_recv_slice() {
1252 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1253
1254 for i in 1..=10 {
1255 tx.try_send(i).unwrap();
1256 }
1257
1258 let mut dest = [0u32; 10];
1259 let received = rx.recv_slice(&mut dest).await;
1260
1261 assert_eq!(received, 10);
1262 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1263 assert!(rx.is_empty());
1264 }
1265
1266 #[tokio::test]
1267 async fn test_recv_slice_with_wait() {
1268 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1269
1270 let recv_handle = tokio::spawn(async move {
1271 let mut dest = [0u32; 8];
1272 let received = rx.recv_slice(&mut dest).await;
1273 (received, dest)
1274 });
1275
1276 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1277
1278 for i in 1..=8 {
1280 tx.send(i).await.unwrap();
1281 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1282 }
1283
1284 let (received, dest) = recv_handle.await.unwrap();
1285 assert_eq!(received, 8);
1286 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1287 }
1288
1289 #[tokio::test]
1290 async fn test_recv_slice_channel_closed() {
1291 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1292
1293 tx.try_send(1).unwrap();
1294 tx.try_send(2).unwrap();
1295 tx.try_send(3).unwrap();
1296
1297 drop(tx); let mut dest = [0u32; 10];
1300 let received = rx.recv_slice(&mut dest).await;
1301
1302 assert_eq!(received, 3);
1304 assert_eq!(&dest[0..3], &[1, 2, 3]);
1305 }
1306
1307 #[tokio::test]
1308 async fn test_combined_new_apis() {
1309 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1310
1311 let data = [1, 2, 3, 4, 5];
1313 tx.try_send_slice(&data);
1314
1315 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1316
1317 assert_eq!(tx.len(), 5);
1318 assert_eq!(rx.len(), 5);
1319 assert_eq!(rx.capacity(), 16);
1320
1321 assert_eq!(rx.peek(), Some(&1));
1323
1324 let mut dest = [0u32; 3];
1326 rx.try_recv_slice(&mut dest);
1327 assert_eq!(dest, [1, 2, 3]);
1328
1329 assert_eq!(rx.len(), 2);
1330 assert_eq!(tx.free_slots(), 14);
1331
1332 let remaining: Vec<u32> = rx.drain().collect();
1334 assert_eq!(remaining, vec![4, 5]);
1335
1336 assert!(rx.is_empty());
1337 assert!(!tx.is_full());
1338 }
1339}