1#![deny(unsafe_op_in_unsafe_fn)]
62#![warn(missing_docs, missing_debug_implementations)]
63
64use core::fmt;
65use std::mem::ManuallyDrop;
66use std::sync::Arc;
67use std::sync::atomic::{AtomicBool, Ordering};
68use std::time::{Duration, Instant};
69
70use crossbeam_utils::sync::{Parker, Unparker};
71use crossbeam_utils::{Backoff, CachePadded};
72use nexus_queue::Full;
73use nexus_queue::spsc::{Consumer, Producer, ring_buffer};
74
75pub mod spsc {
77 pub use crate::{Receiver, Sender, channel, channel_with_config};
84}
85
86const DEFAULT_SNOOZE_ITERS: usize = 8;
92
93struct Shared {
95 sender_parked: CachePadded<AtomicBool>,
96 receiver_parked: CachePadded<AtomicBool>,
97}
98
99pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
119 channel_with_config(capacity, DEFAULT_SNOOZE_ITERS)
120}
121
122pub fn channel_with_config<T>(capacity: usize, snooze_iters: usize) -> (Sender<T>, Receiver<T>) {
134 let (producer, consumer) = ring_buffer(capacity);
135
136 let shared = Arc::new(Shared {
137 sender_parked: CachePadded::new(AtomicBool::new(false)),
138 receiver_parked: CachePadded::new(AtomicBool::new(false)),
139 });
140
141 let sender_parker = Parker::new();
142 let sender_unparker = sender_parker.unparker().clone();
143
144 let receiver_parker = Parker::new();
145 let receiver_unparker = receiver_parker.unparker().clone();
146
147 (
148 Sender {
149 producer: ManuallyDrop::new(producer),
150 shared: Arc::clone(&shared),
151 parker: sender_parker,
152 receiver_unparker,
153 snooze_iters,
154 },
155 Receiver {
156 consumer: ManuallyDrop::new(consumer),
157 shared,
158 parker: receiver_parker,
159 sender_unparker,
160 snooze_iters,
161 },
162 )
163}
164
165pub struct Sender<T> {
174 producer: ManuallyDrop<Producer<T>>,
175 shared: Arc<Shared>,
176 parker: Parker,
177 receiver_unparker: Unparker,
178 snooze_iters: usize,
179}
180
181impl<T> Sender<T> {
182 #[inline]
189 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
190 if self.producer.is_disconnected() {
191 return cold_send_err(value);
192 }
193
194 let mut val = value;
195
196 match self.producer.push(val) {
198 Ok(()) => {
199 self.notify_receiver();
200 return Ok(());
201 }
202 Err(Full(v)) => val = v,
203 }
204
205 let backoff = Backoff::new();
207 for _ in 0..self.snooze_iters {
208 backoff.snooze();
209
210 if self.producer.is_disconnected() {
211 return cold_send_err(val);
212 }
213
214 match self.producer.push(val) {
215 Ok(()) => {
216 self.notify_receiver();
217 return Ok(());
218 }
219 Err(Full(v)) => val = v,
220 }
221 }
222
223 loop {
225 self.shared.sender_parked.store(true, Ordering::SeqCst);
226
227 if self.producer.is_disconnected() {
228 self.shared.sender_parked.store(false, Ordering::Relaxed);
229 return cold_send_err(val);
230 }
231
232 match self.producer.push(val) {
233 Ok(()) => {
234 self.shared.sender_parked.store(false, Ordering::Relaxed);
235 self.notify_receiver();
236 return Ok(());
237 }
238 Err(Full(v)) => val = v,
239 }
240
241 self.parker.park();
242 self.shared.sender_parked.store(false, Ordering::Relaxed);
243
244 if self.producer.is_disconnected() {
245 return cold_send_err(val);
246 }
247
248 match self.producer.push(val) {
249 Ok(()) => {
250 self.notify_receiver();
251 return Ok(());
252 }
253 Err(Full(v)) => val = v,
254 }
255 }
256 }
257
258 #[inline]
265 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
266 if self.producer.is_disconnected() {
267 return cold_try_send_disconnected(value);
268 }
269
270 match self.producer.push(value) {
271 Ok(()) => {
272 self.notify_receiver();
273 Ok(())
274 }
275 Err(Full(v)) => Err(TrySendError::Full(v)),
276 }
277 }
278
279 #[inline]
280 fn notify_receiver(&self) {
281 if self.shared.receiver_parked.load(Ordering::SeqCst) {
282 self.receiver_unparker.unpark();
283 }
284 }
285
286 #[inline]
288 pub fn is_disconnected(&self) -> bool {
289 self.producer.is_disconnected()
290 }
291
292 #[inline]
294 pub fn capacity(&self) -> usize {
295 self.producer.capacity()
296 }
297}
298
299impl<T> fmt::Debug for Sender<T> {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301 f.debug_struct("Sender")
302 .field("capacity", &self.capacity())
303 .field("disconnected", &self.is_disconnected())
304 .finish_non_exhaustive()
305 }
306}
307
308impl<T> Drop for Sender<T> {
309 fn drop(&mut self) {
310 unsafe { ManuallyDrop::drop(&mut self.producer) };
314 self.receiver_unparker.unpark();
315 }
316}
317
318pub struct Receiver<T> {
328 consumer: ManuallyDrop<Consumer<T>>,
329 shared: Arc<Shared>,
330 parker: Parker,
331 sender_unparker: Unparker,
332 snooze_iters: usize,
333}
334
335impl<T> Receiver<T> {
336 #[inline]
344 pub fn recv(&self) -> Result<T, RecvError> {
345 if let Some(v) = self.consumer.pop() {
347 self.notify_sender();
348 return Ok(v);
349 }
350
351 let backoff = Backoff::new();
353 for _ in 0..self.snooze_iters {
354 backoff.snooze();
355
356 if let Some(v) = self.consumer.pop() {
357 self.notify_sender();
358 return Ok(v);
359 }
360
361 if self.consumer.is_disconnected() {
362 return self.consumer.pop().ok_or(RecvError);
363 }
364 }
365
366 loop {
368 self.shared.receiver_parked.store(true, Ordering::SeqCst);
369
370 if let Some(v) = self.consumer.pop() {
371 self.shared.receiver_parked.store(false, Ordering::Relaxed);
372 self.notify_sender();
373 return Ok(v);
374 }
375
376 if self.consumer.is_disconnected() {
377 self.shared.receiver_parked.store(false, Ordering::Relaxed);
378 return cold_recv_err();
379 }
380
381 self.parker.park();
382 self.shared.receiver_parked.store(false, Ordering::Relaxed);
383
384 if let Some(v) = self.consumer.pop() {
385 self.notify_sender();
386 return Ok(v);
387 }
388
389 if self.consumer.is_disconnected() {
390 return cold_recv_err();
391 }
392 }
393 }
394
395 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
402 let deadline = Instant::now() + timeout;
403
404 if let Some(v) = self.consumer.pop() {
406 self.notify_sender();
407 return Ok(v);
408 }
409
410 let backoff = Backoff::new();
412 for _ in 0..self.snooze_iters {
413 if Instant::now() >= deadline {
414 return Err(RecvTimeoutError::Timeout);
415 }
416
417 backoff.snooze();
418
419 if let Some(v) = self.consumer.pop() {
420 self.notify_sender();
421 return Ok(v);
422 }
423
424 if self.consumer.is_disconnected() {
425 return self.consumer.pop().ok_or(RecvTimeoutError::Disconnected);
426 }
427 }
428
429 loop {
431 let now = Instant::now();
432 if now >= deadline {
433 return Err(RecvTimeoutError::Timeout);
434 }
435
436 self.shared.receiver_parked.store(true, Ordering::SeqCst);
437
438 if let Some(v) = self.consumer.pop() {
439 self.shared.receiver_parked.store(false, Ordering::Relaxed);
440 self.notify_sender();
441 return Ok(v);
442 }
443
444 if self.consumer.is_disconnected() {
445 self.shared.receiver_parked.store(false, Ordering::Relaxed);
446 return cold_recv_timeout_disconnected();
447 }
448
449 let remaining = deadline - now;
450 self.parker.park_timeout(remaining);
451 self.shared.receiver_parked.store(false, Ordering::Relaxed);
452
453 if let Some(v) = self.consumer.pop() {
454 self.notify_sender();
455 return Ok(v);
456 }
457
458 if self.consumer.is_disconnected() {
459 return cold_recv_timeout_disconnected();
460 }
461 }
462 }
463
464 #[inline]
471 #[allow(clippy::option_if_let_else)]
472 pub fn try_recv(&self) -> Result<T, TryRecvError> {
473 match self.consumer.pop() {
474 Some(v) => {
475 self.notify_sender();
476 Ok(v)
477 }
478 None => {
479 if self.consumer.is_disconnected() {
480 cold_try_recv_disconnected()
481 } else {
482 Err(TryRecvError::Empty)
483 }
484 }
485 }
486 }
487
488 #[inline]
489 fn notify_sender(&self) {
490 if self.shared.sender_parked.load(Ordering::SeqCst) {
491 self.sender_unparker.unpark();
492 }
493 }
494
495 #[inline]
497 pub fn is_disconnected(&self) -> bool {
498 self.consumer.is_disconnected()
499 }
500
501 #[inline]
503 pub fn capacity(&self) -> usize {
504 self.consumer.capacity()
505 }
506}
507
508impl<T> fmt::Debug for Receiver<T> {
509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510 f.debug_struct("Receiver")
511 .field("capacity", &self.capacity())
512 .field("disconnected", &self.is_disconnected())
513 .finish_non_exhaustive()
514 }
515}
516
517impl<T> Drop for Receiver<T> {
518 fn drop(&mut self) {
519 unsafe { ManuallyDrop::drop(&mut self.consumer) };
523 self.sender_unparker.unpark();
524 }
525}
526
527#[derive(Debug, Clone, Copy, PartialEq, Eq)]
535pub struct SendError<T>(pub T);
536
537impl<T> SendError<T> {
538 pub fn into_inner(self) -> T {
540 self.0
541 }
542}
543
544impl<T> fmt::Display for SendError<T> {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 write!(f, "channel disconnected")
547 }
548}
549
550impl<T: fmt::Debug> std::error::Error for SendError<T> {}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq)]
557pub struct RecvError;
558
559impl fmt::Display for RecvError {
560 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
561 write!(f, "channel disconnected")
562 }
563}
564
565impl std::error::Error for RecvError {}
566
567#[derive(Debug, Clone, Copy, PartialEq, Eq)]
569pub enum TrySendError<T> {
570 Full(T),
572
573 Disconnected(T),
575}
576
577impl<T> TrySendError<T> {
578 pub fn into_inner(self) -> T {
580 match self {
581 TrySendError::Full(v) | TrySendError::Disconnected(v) => v,
582 }
583 }
584
585 pub fn is_full(&self) -> bool {
587 matches!(self, TrySendError::Full(_))
588 }
589
590 pub fn is_disconnected(&self) -> bool {
592 matches!(self, TrySendError::Disconnected(_))
593 }
594}
595
596impl<T> fmt::Display for TrySendError<T> {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 match self {
599 TrySendError::Full(_) => write!(f, "channel full"),
600 TrySendError::Disconnected(_) => write!(f, "channel disconnected"),
601 }
602 }
603}
604
605impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
606
607#[derive(Debug, Clone, Copy, PartialEq, Eq)]
609pub enum TryRecvError {
610 Empty,
612
613 Disconnected,
615}
616
617impl TryRecvError {
618 pub fn is_empty(&self) -> bool {
620 matches!(self, TryRecvError::Empty)
621 }
622
623 pub fn is_disconnected(&self) -> bool {
625 matches!(self, TryRecvError::Disconnected)
626 }
627}
628
629impl fmt::Display for TryRecvError {
630 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
631 match self {
632 TryRecvError::Empty => write!(f, "channel empty"),
633 TryRecvError::Disconnected => write!(f, "channel disconnected"),
634 }
635 }
636}
637
638impl std::error::Error for TryRecvError {}
639
640#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642pub enum RecvTimeoutError {
643 Timeout,
645
646 Disconnected,
648}
649
650impl RecvTimeoutError {
651 pub fn is_timeout(&self) -> bool {
653 matches!(self, RecvTimeoutError::Timeout)
654 }
655
656 pub fn is_disconnected(&self) -> bool {
658 matches!(self, RecvTimeoutError::Disconnected)
659 }
660}
661
662impl fmt::Display for RecvTimeoutError {
663 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664 match self {
665 RecvTimeoutError::Timeout => write!(f, "timed out"),
666 RecvTimeoutError::Disconnected => write!(f, "channel disconnected"),
667 }
668 }
669}
670
671impl std::error::Error for RecvTimeoutError {}
672
673#[cold]
678fn cold_send_err<T>(val: T) -> Result<(), SendError<T>> {
679 Err(SendError(val))
680}
681
682#[cold]
683fn cold_try_send_disconnected<T>(val: T) -> Result<(), TrySendError<T>> {
684 Err(TrySendError::Disconnected(val))
685}
686
687#[cold]
688fn cold_recv_err<T>() -> Result<T, RecvError> {
689 Err(RecvError)
690}
691
692#[cold]
693fn cold_try_recv_disconnected<T>() -> Result<T, TryRecvError> {
694 Err(TryRecvError::Disconnected)
695}
696
697#[cold]
698fn cold_recv_timeout_disconnected<T>() -> Result<T, RecvTimeoutError> {
699 Err(RecvTimeoutError::Disconnected)
700}
701
702#[cfg(test)]
707mod tests {
708 use super::*;
709 use std::sync::atomic::AtomicUsize;
710 use std::thread;
711
712 #[test]
717 fn basic_send_recv() {
718 let (tx, rx) = channel::<u64>(4);
719
720 tx.send(1).unwrap();
721 tx.send(2).unwrap();
722 tx.send(3).unwrap();
723
724 assert_eq!(rx.recv().unwrap(), 1);
725 assert_eq!(rx.recv().unwrap(), 2);
726 assert_eq!(rx.recv().unwrap(), 3);
727 }
728
729 #[test]
730 fn try_send_try_recv() {
731 let (tx, rx) = channel::<u64>(2);
732
733 assert!(tx.try_send(1).is_ok());
734 assert!(tx.try_send(2).is_ok());
735 assert!(matches!(tx.try_send(3), Err(TrySendError::Full(3))));
736
737 assert_eq!(rx.try_recv().unwrap(), 1);
738 assert_eq!(rx.try_recv().unwrap(), 2);
739 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
740 }
741
742 #[test]
743 fn send_fills_then_recv_drains() {
744 let (tx, rx) = channel::<u64>(4);
745
746 for i in 0..4 {
747 tx.try_send(i).unwrap();
748 }
749 assert!(matches!(tx.try_send(99), Err(TrySendError::Full(99))));
750
751 for i in 0..4 {
752 assert_eq!(rx.recv().unwrap(), i);
753 }
754 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
755 }
756
757 #[test]
762 fn recv_timeout_success() {
763 let (tx, rx) = channel::<u64>(4);
764
765 tx.send(42).unwrap();
766
767 let result = rx.recv_timeout(Duration::from_millis(100));
768 assert_eq!(result.unwrap(), 42);
769 }
770
771 #[test]
772 fn recv_timeout_expires() {
773 let (_tx, rx) = channel::<u64>(4);
774
775 let start = Instant::now();
776 let result = rx.recv_timeout(Duration::from_millis(50));
777
778 assert!(matches!(result, Err(RecvTimeoutError::Timeout)));
779 assert!(start.elapsed() >= Duration::from_millis(50));
780 }
781
782 #[test]
783 fn recv_timeout_disconnected() {
784 let (tx, rx) = channel::<u64>(4);
785
786 drop(tx);
787
788 let result = rx.recv_timeout(Duration::from_millis(100));
789 assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
790 }
791
792 #[test]
793 fn recv_timeout_data_arrives() {
794 let (tx, rx) = channel::<u64>(4);
795
796 let handle = thread::spawn(move || {
797 thread::sleep(Duration::from_millis(25));
798 tx.send(42).unwrap();
799 });
800
801 let result = rx.recv_timeout(Duration::from_millis(100));
802 assert_eq!(result.unwrap(), 42);
803
804 handle.join().unwrap();
805 }
806
807 #[test]
808 fn recv_timeout_disconnect_while_waiting() {
809 let (tx, rx) = channel::<u64>(4);
810
811 let handle = thread::spawn(move || {
812 thread::sleep(Duration::from_millis(25));
813 drop(tx);
814 });
815
816 let result = rx.recv_timeout(Duration::from_millis(100));
817 assert!(matches!(result, Err(RecvTimeoutError::Disconnected)));
818
819 handle.join().unwrap();
820 }
821
822 #[test]
827 fn recv_returns_error_when_sender_dropped() {
828 let (tx, rx) = channel::<u64>(4);
829
830 drop(tx);
831
832 assert!(rx.recv().is_err());
833 assert!(matches!(rx.try_recv(), Err(TryRecvError::Disconnected)));
834 }
835
836 #[test]
837 fn recv_drains_before_error_when_sender_dropped() {
838 let (tx, rx) = channel::<u64>(4);
839
840 tx.send(1).unwrap();
841 tx.send(2).unwrap();
842 drop(tx);
843
844 assert_eq!(rx.recv().unwrap(), 1);
845 assert_eq!(rx.recv().unwrap(), 2);
846 assert!(rx.recv().is_err());
847 }
848
849 #[test]
850 fn send_returns_error_when_receiver_dropped() {
851 let (tx, rx) = channel::<u64>(4);
852
853 drop(rx);
854
855 assert!(tx.send(1).is_err());
856 assert!(matches!(tx.try_send(1), Err(TrySendError::Disconnected(1))));
857 }
858
859 #[test]
860 fn is_disconnected_sender() {
861 let (tx, rx) = channel::<u64>(4);
862
863 assert!(!tx.is_disconnected());
864 drop(rx);
865 assert!(tx.is_disconnected());
866 }
867
868 #[test]
869 fn is_disconnected_receiver() {
870 let (tx, rx) = channel::<u64>(4);
871
872 assert!(!rx.is_disconnected());
873 drop(tx);
874 assert!(rx.is_disconnected());
875 }
876
877 #[test]
882 fn cross_thread_single_message() {
883 let (tx, rx) = channel::<u64>(4);
884
885 let handle = thread::spawn(move || rx.recv().unwrap());
886
887 tx.send(42).unwrap();
888
889 assert_eq!(handle.join().unwrap(), 42);
890 }
891
892 #[test]
893 fn cross_thread_multiple_messages() {
894 let (tx, rx) = channel::<u64>(4);
895
896 let handle = thread::spawn(move || {
897 let mut sum = 0;
898 for _ in 0..100 {
899 sum += rx.recv().unwrap();
900 }
901 sum
902 });
903
904 for i in 0..100 {
905 tx.send(i).unwrap();
906 }
907
908 let sum = handle.join().unwrap();
909 assert_eq!(sum, 99 * 100 / 2);
910 }
911
912 #[test]
917 fn fifo_ordering_single_thread() {
918 let (tx, rx) = channel::<u64>(8);
919
920 for i in 0..8 {
921 tx.try_send(i).unwrap();
922 }
923
924 for i in 0..8 {
925 assert_eq!(rx.recv().unwrap(), i);
926 }
927 }
928
929 #[test]
930 fn fifo_ordering_cross_thread() {
931 let (tx, rx) = channel::<u64>(64);
932
933 let handle = thread::spawn(move || {
934 let mut expected = 0u64;
935 while expected < 10_000 {
936 let val = rx.recv().unwrap();
937 assert_eq!(val, expected, "FIFO order violated");
938 expected += 1;
939 }
940 });
941
942 for i in 0..10_000 {
943 tx.send(i).unwrap();
944 }
945
946 handle.join().unwrap();
947 }
948
949 #[test]
954 fn recv_blocks_until_send() {
955 let (tx, rx) = channel::<u64>(4);
956
957 let start = Instant::now();
958
959 let handle = thread::spawn(move || rx.recv().unwrap());
960
961 thread::sleep(Duration::from_millis(50));
962 tx.send(42).unwrap();
963
964 let val = handle.join().unwrap();
965 assert_eq!(val, 42);
966 assert!(start.elapsed() >= Duration::from_millis(50));
967 }
968
969 #[test]
970 fn send_blocks_until_recv() {
971 let (tx, rx) = channel::<u64>(2);
972
973 tx.try_send(1).unwrap();
975 tx.try_send(2).unwrap();
976
977 let start = Instant::now();
978
979 let handle = thread::spawn(move || {
980 tx.send(3).unwrap(); tx
982 });
983
984 thread::sleep(Duration::from_millis(50));
985 rx.recv().unwrap(); let _ = handle.join().unwrap();
988 assert!(start.elapsed() >= Duration::from_millis(50));
989 }
990
991 #[test]
996 fn recv_wakes_on_sender_drop() {
997 let (tx, rx) = channel::<u64>(4);
998
999 let handle = thread::spawn(move || {
1000 let result = rx.recv();
1001 assert!(result.is_err());
1002 });
1003
1004 thread::sleep(Duration::from_millis(50));
1005 drop(tx);
1006
1007 handle.join().unwrap();
1009 }
1010
1011 #[test]
1012 fn send_wakes_on_receiver_drop() {
1013 let (tx, rx) = channel::<u64>(1);
1014
1015 tx.try_send(1).unwrap(); let handle = thread::spawn(move || {
1018 let result = tx.send(2); assert!(result.is_err());
1020 });
1021
1022 thread::sleep(Duration::from_millis(50));
1023 drop(rx);
1024
1025 handle.join().unwrap();
1027 }
1028
1029 #[test]
1034 fn capacity_one() {
1035 let (tx, rx) = channel::<u64>(1);
1036
1037 for i in 0..100 {
1038 tx.send(i).unwrap();
1039 assert_eq!(rx.recv().unwrap(), i);
1040 }
1041 }
1042
1043 #[test]
1044 fn capacity_one_cross_thread() {
1045 let (tx, rx) = channel::<u64>(1);
1046
1047 let handle = thread::spawn(move || {
1048 for _ in 0..1000 {
1049 rx.recv().unwrap();
1050 }
1051 });
1052
1053 for i in 0..1000 {
1054 tx.send(i).unwrap();
1055 }
1056
1057 handle.join().unwrap();
1058 }
1059
1060 #[test]
1065 fn values_dropped_on_channel_drop() {
1066 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
1067
1068 #[derive(Debug)]
1069 struct DropCounter;
1070 impl Drop for DropCounter {
1071 fn drop(&mut self) {
1072 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
1073 }
1074 }
1075
1076 DROP_COUNT.store(0, Ordering::SeqCst);
1077
1078 let (tx, rx) = channel::<DropCounter>(4);
1079
1080 tx.try_send(DropCounter).unwrap();
1081 tx.try_send(DropCounter).unwrap();
1082 tx.try_send(DropCounter).unwrap();
1083
1084 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
1085
1086 drop(tx);
1087 drop(rx);
1088
1089 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
1090 }
1091
1092 #[test]
1093 fn failed_send_returns_value() {
1094 let (tx, rx) = channel::<String>(1);
1095
1096 tx.try_send("hello".to_string()).unwrap();
1097
1098 let err = tx.try_send("world".to_string());
1099 match err {
1100 Err(TrySendError::Full(s)) => assert_eq!(s, "world"),
1101 _ => panic!("expected Full error"),
1102 }
1103
1104 drop(rx);
1105
1106 let err = tx.try_send("test".to_string());
1107 match err {
1108 Err(TrySendError::Disconnected(s)) => assert_eq!(s, "test"),
1109 _ => panic!("expected Disconnected error"),
1110 }
1111 }
1112
1113 #[test]
1118 fn zero_sized_type() {
1119 let (tx, rx) = channel::<()>(4);
1120
1121 tx.send(()).unwrap();
1122 tx.send(()).unwrap();
1123
1124 assert_eq!(rx.recv().unwrap(), ());
1125 assert_eq!(rx.recv().unwrap(), ());
1126 }
1127
1128 #[test]
1129 fn large_message_type() {
1130 #[derive(Clone, PartialEq, Debug)]
1131 struct LargeMessage {
1132 data: [u8; 4096],
1133 }
1134
1135 let (tx, rx) = channel::<LargeMessage>(4);
1136
1137 let msg = LargeMessage { data: [42u8; 4096] };
1138 tx.send(msg).unwrap();
1139
1140 let received = rx.recv().unwrap();
1141 assert_eq!(received.data[0], 42);
1142 assert_eq!(received.data[4095], 42);
1143 }
1144
1145 #[test]
1150 fn many_laps_single_thread() {
1151 let (tx, rx) = channel::<u64>(4);
1152
1153 for i in 0..1000 {
1155 tx.send(i).unwrap();
1156 assert_eq!(rx.recv().unwrap(), i);
1157 }
1158 }
1159
1160 #[test]
1161 fn many_laps_cross_thread() {
1162 const COUNT: u64 = 100_000;
1163
1164 let (tx, rx) = channel::<u64>(4); let producer = thread::spawn(move || {
1167 for i in 0..COUNT {
1168 tx.send(i).unwrap();
1169 }
1170 });
1171
1172 let consumer = thread::spawn(move || {
1173 let mut expected = 0u64;
1174 while expected < COUNT {
1175 let val = rx.recv().unwrap();
1176 assert_eq!(val, expected);
1177 expected += 1;
1178 }
1179 });
1180
1181 producer.join().unwrap();
1182 consumer.join().unwrap();
1183 }
1184
1185 #[test]
1190 fn stress_high_volume() {
1191 const COUNT: u64 = 100_000;
1192
1193 let (tx, rx) = channel::<u64>(1024);
1194
1195 let producer = thread::spawn(move || {
1196 for i in 0..COUNT {
1197 tx.send(i).unwrap();
1198 }
1199 });
1200
1201 let consumer = thread::spawn(move || {
1202 let mut sum = 0u64;
1203 for _ in 0..COUNT {
1204 sum = sum.wrapping_add(rx.recv().unwrap());
1205 }
1206 sum
1207 });
1208
1209 producer.join().unwrap();
1210 let sum = consumer.join().unwrap();
1211 assert_eq!(sum, COUNT * (COUNT - 1) / 2);
1212 }
1213
1214 #[test]
1215 fn stress_small_buffer() {
1216 const COUNT: u64 = 10_000;
1217
1218 let (tx, rx) = channel::<u64>(4);
1219
1220 let producer = thread::spawn(move || {
1221 for i in 0..COUNT {
1222 tx.send(i).unwrap();
1223 }
1224 });
1225
1226 let consumer = thread::spawn(move || {
1227 let mut received = 0u64;
1228 while received < COUNT {
1229 rx.recv().unwrap();
1230 received += 1;
1231 }
1232 received
1233 });
1234
1235 producer.join().unwrap();
1236 let received = consumer.join().unwrap();
1237 assert_eq!(received, COUNT);
1238 }
1239
1240 #[test]
1241 fn stress_capacity_one_high_volume() {
1242 const COUNT: u64 = 10_000;
1243
1244 let (tx, rx) = channel::<u64>(1);
1245
1246 let producer = thread::spawn(move || {
1247 for i in 0..COUNT {
1248 tx.send(i).unwrap();
1249 }
1250 });
1251
1252 let consumer = thread::spawn(move || {
1253 let mut expected = 0u64;
1254 while expected < COUNT {
1255 let val = rx.recv().unwrap();
1256 assert_eq!(val, expected);
1257 expected += 1;
1258 }
1259 });
1260
1261 producer.join().unwrap();
1262 consumer.join().unwrap();
1263 }
1264
1265 #[test]
1270 fn ping_pong_basic() {
1271 let (tx1, rx1) = channel::<u64>(1);
1272 let (tx2, rx2) = channel::<u64>(1);
1273
1274 let handle = thread::spawn(move || {
1275 for i in 0..1000 {
1276 let val = rx1.recv().unwrap();
1277 assert_eq!(val, i);
1278 tx2.send(i).unwrap();
1279 }
1280 });
1281
1282 for i in 0..1000 {
1283 tx1.send(i).unwrap();
1284 let val = rx2.recv().unwrap();
1285 assert_eq!(val, i);
1286 }
1287
1288 handle.join().unwrap();
1289 }
1290
1291 #[test]
1292 fn ping_pong_high_iterations() {
1293 let (tx1, rx1) = channel::<u64>(1);
1294 let (tx2, rx2) = channel::<u64>(1);
1295
1296 let handle = thread::spawn(move || {
1297 for i in 0..10_000 {
1298 let val = rx1.recv().unwrap();
1299 assert_eq!(val, i);
1300 tx2.send(i * 2).unwrap();
1301 }
1302 });
1303
1304 for i in 0..10_000 {
1305 tx1.send(i).unwrap();
1306 let val = rx2.recv().unwrap();
1307 assert_eq!(val, i * 2);
1308 }
1309
1310 handle.join().unwrap();
1311 }
1312
1313 #[test]
1318 fn no_deadlock_alternating() {
1319 let (tx, rx) = channel::<u64>(1);
1320
1321 let handle = thread::spawn(move || {
1322 for i in 0..1000u64 {
1323 tx.send(i).unwrap();
1324 }
1325 });
1326
1327 for _ in 0..1000 {
1328 rx.recv().unwrap();
1329 }
1330
1331 handle.join().unwrap();
1332 }
1333
1334 #[test]
1335 fn no_deadlock_burst_then_drain() {
1336 let (tx, rx) = channel::<u64>(8);
1337
1338 for round in 0..100 {
1339 for i in 0..8 {
1341 tx.try_send(round * 8 + i).unwrap();
1342 }
1343 for i in 0..8 {
1345 assert_eq!(rx.recv().unwrap(), round * 8 + i);
1346 }
1347 }
1348 }
1349
1350 #[test]
1351 fn no_deadlock_concurrent_full_empty_transitions() {
1352 let (tx, rx) = channel::<u64>(2);
1353
1354 let producer = thread::spawn(move || {
1355 for i in 0..10_000u64 {
1356 tx.send(i).unwrap();
1357 }
1358 });
1359
1360 let consumer = thread::spawn(move || {
1361 for _ in 0..10_000 {
1362 rx.recv().unwrap();
1363 }
1364 });
1365
1366 producer.join().unwrap();
1367 consumer.join().unwrap();
1368 }
1369
1370 #[test]
1371 fn no_deadlock_disconnect_while_blocked_recv() {
1372 let (tx, rx) = channel::<u64>(1);
1373
1374 let handle = thread::spawn(move || {
1375 let result = rx.recv();
1377 assert!(result.is_err()); });
1379
1380 thread::sleep(Duration::from_millis(50));
1381 drop(tx); handle.join().unwrap();
1384 }
1385
1386 #[test]
1387 fn no_deadlock_disconnect_while_blocked_send() {
1388 let (tx, rx) = channel::<u64>(1);
1389 tx.try_send(1).unwrap(); let handle = thread::spawn(move || {
1392 let result = tx.send(2);
1394 assert!(result.is_err()); });
1396
1397 thread::sleep(Duration::from_millis(50));
1398 drop(rx); handle.join().unwrap();
1401 }
1402
1403 #[test]
1408 fn stress_rapid_park_unpark_sender() {
1409 let (tx, rx) = channel::<u64>(1);
1410
1411 let handle = thread::spawn(move || {
1412 for _ in 0..10_000 {
1413 rx.recv().unwrap();
1414 }
1415 });
1416
1417 for i in 0..10_000 {
1418 tx.send(i).unwrap();
1419 }
1420
1421 handle.join().unwrap();
1422 }
1423
1424 #[test]
1425 fn stress_rapid_park_unpark_receiver() {
1426 let (tx, rx) = channel::<u64>(1);
1427
1428 let handle = thread::spawn(move || {
1429 for i in 0..10_000 {
1430 tx.send(i).unwrap();
1431 }
1432 });
1433
1434 for _ in 0..10_000 {
1435 rx.recv().unwrap();
1436 }
1437
1438 handle.join().unwrap();
1439 }
1440
1441 #[test]
1442 fn stress_park_unpark_both_sides() {
1443 let (tx, rx) = channel::<u64>(1);
1445
1446 let sender = thread::spawn(move || {
1447 for i in 0..50_000 {
1448 tx.send(i).unwrap();
1449 }
1450 });
1451
1452 let receiver = thread::spawn(move || {
1453 let mut count = 0;
1454 for _ in 0..50_000 {
1455 rx.recv().unwrap();
1456 count += 1;
1457 }
1458 count
1459 });
1460
1461 sender.join().unwrap();
1462 assert_eq!(receiver.join().unwrap(), 50_000);
1463 }
1464
1465 #[test]
1470 fn completes_in_reasonable_time() {
1471 use std::sync::mpsc;
1472
1473 let (done_tx, done_rx) = mpsc::channel();
1474
1475 let handle = thread::spawn(move || {
1476 let (tx, rx) = channel::<u64>(1);
1477
1478 let h = thread::spawn(move || {
1479 for i in 0..1000 {
1480 tx.send(i).unwrap();
1481 }
1482 });
1483
1484 for _ in 0..1000 {
1485 rx.recv().unwrap();
1486 }
1487
1488 h.join().unwrap();
1489 done_tx.send(()).unwrap();
1490 });
1491
1492 let result = done_rx.recv_timeout(Duration::from_secs(5));
1494 assert!(result.is_ok(), "Test timed out - possible deadlock!");
1495
1496 handle.join().unwrap();
1497 }
1498
1499 #[test]
1500 fn does_not_hang_on_disconnect_during_recv() {
1501 let done = Arc::new(AtomicBool::new(false));
1502 let done_clone = done.clone();
1503
1504 let (tx, rx) = channel::<u64>(4);
1505
1506 let handle = thread::spawn(move || {
1507 let _ = rx.recv(); done_clone.store(true, Ordering::SeqCst);
1509 });
1510
1511 thread::sleep(Duration::from_millis(50));
1512 assert!(!done.load(Ordering::SeqCst)); drop(tx);
1515
1516 handle.join().unwrap();
1517 assert!(done.load(Ordering::SeqCst)); }
1519
1520 #[test]
1521 fn does_not_hang_on_disconnect_during_send() {
1522 let done = Arc::new(AtomicBool::new(false));
1523 let done_clone = done.clone();
1524
1525 let (tx, rx) = channel::<u64>(1);
1526 tx.try_send(1).unwrap(); let handle = thread::spawn(move || {
1529 let _ = tx.send(2); done_clone.store(true, Ordering::SeqCst);
1531 });
1532
1533 thread::sleep(Duration::from_millis(50));
1534 assert!(!done.load(Ordering::SeqCst)); drop(rx);
1537
1538 handle.join().unwrap();
1539 assert!(done.load(Ordering::SeqCst)); }
1541
1542 #[test]
1547 fn rapid_channel_creation() {
1548 for _ in 0..1000 {
1549 let (tx, rx) = channel::<u64>(4);
1550 tx.try_send(1).unwrap();
1551 assert_eq!(rx.recv().unwrap(), 1);
1552 }
1553 }
1554
1555 #[test]
1556 fn rapid_disconnect() {
1557 for _ in 0..1000 {
1558 let (tx, rx) = channel::<u64>(4);
1559 drop(tx);
1560 drop(rx);
1561 }
1562 }
1563}