1#![feature(negative_impls)]
14
15use bounded_spsc_queue as spsc;
16
17use std::sync::atomic::Ordering;
18
19mod blocking;
20mod select;
21
22const DISCONNECTED : isize = isize::MIN;
23#[cfg(test)]
24const MAX_STEALS : isize = 5;
25#[cfg(not(test))]
26const MAX_STEALS : isize = 1 << 20; const INITIAL_CAPACITY : usize = 128;
28
29pub struct Receiver <T> {
30 consumer : std::cell::UnsafeCell <spsc::Consumer <T>>,
31 receive_new : std::sync::mpsc::Receiver <spsc::Consumer <T>>,
32 inner : std::sync::Arc <Inner>,
33 steals : std::cell::UnsafeCell <isize>
34}
35
36pub struct Sender <T> {
37 producer : std::cell::UnsafeCell <spsc::Producer <T>>,
38 send_new : std::sync::mpsc::Sender <spsc::Consumer <T>>,
39 inner : std::sync::Arc <Inner>
40}
41
42struct Inner {
43 counter : std::sync::atomic::AtomicIsize,
44 connected : std::sync::atomic::AtomicBool,
45 to_wake : std::sync::atomic::AtomicPtr <blocking::Inner>
46}
47
48#[derive(Debug)]
49pub struct Iter <'a, T > {
50 rx : &'a Receiver <T>
51}
52
53#[derive(Debug)]
54pub struct TryIter <'a, T > {
55 rx : &'a Receiver <T>
56}
57
58#[derive(Debug)]
59pub struct IntoIter <T> {
60 rx : Receiver <T>
61}
62
63#[derive(Clone,Copy,Debug,Eq,PartialEq)]
65pub struct RecvError;
66
67#[derive(Clone,Copy,Eq,PartialEq)]
69pub struct SendError <T> (pub T);
70
71#[derive(Clone,Copy,Debug,Eq,PartialEq)]
72pub enum TryRecvError {
73 Empty,
74 Disconnected
75}
76
77#[derive(Clone,Copy,Debug,Eq,PartialEq)]
78pub enum RecvTimeoutError {
79 Timeout,
80 Disconnected
81}
82
83pub enum SelectionResult {
84 SelSuccess,
85 SelCanceled
86}
87
88impl <T> Receiver <T> {
89 #[expect(clippy::missing_panics_doc)]
94 pub fn try_recv (&self) -> Result <T, TryRecvError> {
95 match unsafe { (*self.consumer.get()).try_pop() } {
96 Some (t) => unsafe {
97 if MAX_STEALS < *self.steals.get() {
98 match self.inner.counter.swap (0, Ordering::SeqCst) {
99 DISCONNECTED => {
100 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
101 }
102 n => {
103 let m = std::cmp::min (n, *self.steals.get());
104 *self.steals.get() -= m;
105 self.bump (n - m);
106 }
107 }
108 assert!(0 <= *self.steals.get());
110 }
111 *self.steals.get() += 1;
112 Ok (t)
113 },
114 None => {
115 match self.receive_new.try_recv() {
116 Ok (new_consumer) => {
117 unsafe { *self.consumer.get() = new_consumer; }
118 self.try_recv()
119 },
120 Err (std::sync::mpsc::TryRecvError::Empty) => {
121 match self.inner.counter.load (Ordering::SeqCst) {
122 n if n != DISCONNECTED => Err (TryRecvError::Empty),
123 _ => {
124 match unsafe { (*self.consumer.get()).try_pop() } {
125 Some (t) => Ok (t),
126 None => Err (TryRecvError::Disconnected)
127 }
128 }
129 }
130 },
131 Err (std::sync::mpsc::TryRecvError::Disconnected) => {
132 Err (TryRecvError::Disconnected)
133 }
134 }
135 }
136 }
137 }
138
139 pub fn recv (&self) -> Result <T, RecvError> {
141 match self.try_recv() {
142 Err (TryRecvError::Empty) => {}
143 Err (TryRecvError::Disconnected) => return Err (RecvError),
144 Ok (t) => return Ok (t)
145 }
146 let (wait_token, signal_token) = blocking::tokens();
147 if self.decrement (signal_token).is_ok() {
148 wait_token.wait();
149 }
150 match self.try_recv() {
151 Ok (t) => unsafe {
152 *self.steals.get() -= 1;
153 Ok (t)
154 },
155 Err (TryRecvError::Empty) => unreachable!(
156 "woken thread should have found pending message"),
157 Err (TryRecvError::Disconnected) => Err (RecvError)
158 }
159 }
160
161 pub fn recv_timeout (&self, timeout : std::time::Duration)
162 -> Result <T, RecvTimeoutError>
163 {
164 match self.try_recv() {
165 Ok (t) => Ok (t),
166 Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected),
167 Err (TryRecvError::Empty)
168 => self.recv_max_until (std::time::Instant::now() + timeout)
169 }
170 }
171
172 #[expect(mismatched_lifetime_syntaxes)]
173 pub const fn iter (&self) -> Iter <T> {
174 Iter {
175 rx: self
176 }
177 }
178
179 #[expect(mismatched_lifetime_syntaxes)]
180 pub const fn try_iter (&self) -> TryIter <T> {
181 TryIter {
182 rx: self
183 }
184 }
185
186 pub fn capacity (&self) -> usize {
187 unsafe {
188 (*self.consumer.get()).capacity()
189 }
190 }
191
192 fn recv_max_until (&self, deadline : std::time::Instant)
193 -> Result <T, RecvTimeoutError>
194 {
195 loop {
196 match self.recv_deadline (deadline) {
197 result @ Err (RecvTimeoutError::Timeout) => {
198 if deadline <= std::time::Instant::now() {
199 return result
200 }
201 },
202 result => return result
203 }
204 }
205 }
206
207 fn recv_deadline (&self, deadline : std::time::Instant)
209 -> Result <T, RecvTimeoutError>
210 {
211 match self.try_recv() {
212 Err (TryRecvError::Empty) => {}
213 Err (TryRecvError::Disconnected)
214 => return Err (RecvTimeoutError::Disconnected),
215 Ok (t) => return Ok (t)
216 }
217 let (wait_token, signal_token) = blocking::tokens();
218 if self.decrement (signal_token).is_ok() {
219 let timed_out = !wait_token.wait_max_until (deadline);
220 if timed_out {
221 let _has_data = self.abort_selection_();
223 }
224 }
225 match self.try_recv() {
226 Ok (t) => unsafe {
227 *self.steals.get() -= 1;
228 Ok (t)
229 }
230 Err (TryRecvError::Empty) => Err (RecvTimeoutError::Timeout),
231 Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected)
232 }
233 }
234
235 fn decrement (&self, token : std::sync::Arc <blocking::Inner>)
236 -> Result <(), std::sync::Arc <blocking::Inner>>
237 {
238 assert_eq!(self.inner.to_wake.load (Ordering::SeqCst), std::ptr::null_mut());
239 let ptr = std::sync::Arc::into_raw (token).cast_mut();
252 self.inner.to_wake.store (ptr, Ordering::SeqCst);
253 let steals = unsafe { std::ptr::replace (self.steals.get(), 0) };
254 match self.inner.counter.fetch_sub (1 + steals, Ordering::SeqCst) {
255 DISCONNECTED => {
256 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
257 }
258 n => {
259 assert!(0 <= n);
260 if n - steals <= 0 {
261 return Ok (())
263 }
264 }
265 }
266 self.inner.to_wake.store (std::ptr::null_mut(), Ordering::SeqCst);
267 Err (unsafe { std::sync::Arc::from_raw (ptr) })
272 }
273
274 fn can_recv_ (&self) -> bool {
279 0 < unsafe { (*self.consumer.get()).size() }
280 }
281
282 fn start_selection_ (&self, token : std::sync::Arc <blocking::Inner>)
283 -> SelectionResult
284 {
285 match self.decrement (token) {
286 Ok (()) => SelectionResult::SelSuccess,
287 Err (_token) => {
288 let prev = self.bump (1);
290 assert!(prev == DISCONNECTED || 0 <= prev);
291 SelectionResult::SelCanceled
292 }
293 }
294 }
295
296 fn abort_selection_ (&self) -> bool {
298 let steals = 1;
299 let prev = self.bump (steals + 1);
300 if prev == DISCONNECTED {
301 assert_eq!(self.inner.to_wake.load (Ordering::SeqCst),
302 std::ptr::null_mut());
303 true
304 } else {
305 let cur = prev + steals + 1;
306 assert!(0 <= cur);
307 if prev < 0 {
308 drop (self.inner.take_to_wake());
309 } else {
310 while !self.inner.to_wake.load (Ordering::SeqCst).is_null() {
311 std::thread::yield_now();
312 }
313 }
314 unsafe {
315 assert_eq!(*self.steals.get(), 0);
316 *self.steals.get() = steals;
317 }
318 0 <= prev
319 }
320 }
321
322 fn bump (&self, amt : isize) -> isize {
323 match self.inner.counter.fetch_add (amt, Ordering::SeqCst) {
324 DISCONNECTED => {
325 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
326 DISCONNECTED
327 }
328 n => n
329 }
330 }
331
332} impl <T> std::fmt::Debug for Receiver <T> {
335 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
336 write!(f, "Receiver {{ .. }}")
337 }
338}
339
340impl <T> IntoIterator for Receiver <T> {
341 type Item = T;
342 type IntoIter = IntoIter <T>;
343 fn into_iter (self) -> IntoIter <T> {
344 IntoIter {
345 rx: self
346 }
347 }
348}
349
350impl <'a, T> IntoIterator for &'a Receiver <T> {
351 type Item = T;
352 type IntoIter = Iter <'a, T>;
353 fn into_iter (self) -> Iter <'a, T> {
354 self.iter()
355 }
356}
357
358impl <T> Drop for Receiver <T> {
359 fn drop (&mut self) {
360 self.inner.connected.store (false, Ordering::SeqCst);
365
366 let mut steals = unsafe { *self.steals.get() };
382 const MAX_IDLE_YIELDS : u32 = 32;
388 let mut idle_yields = 0u32;
389 while {
390 let count = self.inner.counter.compare_exchange (
391 steals, DISCONNECTED, Ordering::SeqCst, Ordering::SeqCst
392 ).unwrap_or_else (|i| i);
393 count != DISCONNECTED && count != steals
394 } {
395 let mut drained_here = 0;
397 while let Some (_t) = unsafe { (*self.consumer.get()).try_pop() } {
398 steals += 1;
399 drained_here += 1;
400 }
401 match self.receive_new.try_recv() {
403 Ok (new_consumer) => unsafe {
404 *self.consumer.get() = new_consumer;
405 idle_yields = 0;
406 }
407 Err (std::sync::mpsc::TryRecvError::Empty) => {
408 if drained_here != 0 {
409 idle_yields = 0;
410 } else {
411 idle_yields += 1;
412 if idle_yields >= MAX_IDLE_YIELDS {
413 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
417 break;
418 }
419 std::thread::yield_now();
420 }
421 }
422 Err (std::sync::mpsc::TryRecvError::Disconnected) => {
423 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
426 break;
427 }
428 }
429 }
430 }
431}
432
433impl <T> Sender <T> {
434 #[expect(clippy::missing_panics_doc)]
451 pub fn send (&self, t : T) -> Result <(), SendError <T>> {
452 if self.inner.connected.load (Ordering::SeqCst) {
453 match unsafe { (*self.producer.get()).try_push (t) } {
454 None => {}, Some (t) => { let new_capacity = 2 * unsafe { (*self.producer.get()).capacity() };
457 let (new_producer, new_consumer) = spsc::make (new_capacity);
458 if self.send_new.send (new_consumer).is_err() {
459 return Err (SendError (t));
461 }
462 unsafe { *self.producer.get() = new_producer; }
463 match unsafe { (*self.producer.get()).try_push (t) } {
464 None => {}
465 Some (_t) => unreachable!(
466 "send on a newly created queue should always succeed")
467 }
468 }
469 }
470 match self.inner.counter.fetch_add (1, Ordering::SeqCst) {
472 -1 => {
473 self.inner.take_to_wake().signal();
474 },
475 -2 => {},
476 DISCONNECTED => {
477 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
478 },
496 n => {
497 assert! (0 <= n);
498 }
499 }
500 Ok (())
501 } else {
502 Err (SendError (t))
503 }
504 }
505} impl <T> std::fmt::Debug for Sender <T> {
508 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
509 write!(f, "Sender {{ .. }}")
510 }
511}
512
513impl <T> Drop for Sender <T> {
514 fn drop (&mut self) {
515 self.inner.connected.store (false, Ordering::SeqCst);
516 match self.inner.counter.swap (DISCONNECTED, Ordering::SeqCst) {
517 DISCONNECTED => {}
518 -1 => {
519 self.inner.take_to_wake().signal();
520 }
521 n => {
522 assert!(0 <= n);
523 }
524 }
525 }
526}
527
528impl Inner {
529 fn take_to_wake (&self) -> std::sync::Arc <blocking::Inner> {
530 let ptr = self.to_wake.swap (std::ptr::null_mut(), Ordering::SeqCst);
531 assert!(!ptr.is_null());
532 unsafe {
533 std::sync::Arc::from_raw (ptr)
534 }
535 }
536}
537
538impl <T> Iterator for Iter <'_, T> {
539 type Item = T;
540 fn next (&mut self) -> Option <T> {
541 self.rx.recv().ok()
542 }
543}
544
545impl <T> Iterator for TryIter <'_, T> {
546 type Item = T;
547 fn next (&mut self) -> Option <T> {
548 self.rx.try_recv().ok()
549 }
550}
551
552impl <T> Iterator for IntoIter <T> {
553 type Item = T;
554 fn next (&mut self) -> Option <T> {
555 self.rx.recv().ok()
556 }
557}
558
559impl std::fmt::Display for RecvError {
560 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
561 "receiving on a closed channel".fmt (f)
562 }
563}
564
565impl std::error::Error for RecvError {
566 fn description (&self) -> &'static str {
567 "receiving on a closed channel"
568 }
569
570 fn cause (&self) -> Option <&dyn std::error::Error> {
571 None
572 }
573}
574
575impl <T> std::fmt::Debug for SendError <T> {
576 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
577 "SendError(..)".fmt (f)
578 }
579}
580
581impl <T> std::fmt::Display for SendError <T> {
582 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
583 "sending on a closed channel".fmt (f)
584 }
585}
586
587impl <T : Send> std::error::Error for SendError <T> {
588 fn description (&self) -> &'static str {
589 "sending on a closed channel"
590 }
591
592 fn cause (&self) -> Option <&dyn std::error::Error> {
593 None
594 }
595}
596
597impl std::fmt::Display for TryRecvError {
598 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
599 match *self {
600 TryRecvError::Empty => "receiving on an empty channel".fmt (f),
601 TryRecvError::Disconnected => "receiving on a closed channel".fmt (f)
602 }
603 }
604}
605
606impl std::error::Error for TryRecvError {
607 fn description (&self) -> &str {
608 match *self {
609 TryRecvError::Empty => "receiving on an empty channel",
610 TryRecvError::Disconnected => "receiving on a closed channel"
611 }
612 }
613
614 fn cause (&self) -> Option <&dyn std::error::Error> {
615 None
616 }
617}
618
619pub fn channel <T : 'static> () -> (Sender <T>, Receiver <T>) {
620 let (producer, consumer) = spsc::make (INITIAL_CAPACITY);
621 let (send_new, receive_new) = std::sync::mpsc::channel();
622 let inner = std::sync::Arc::new (
623 Inner {
624 counter: std::sync::atomic::AtomicIsize::new (0),
625 connected: std::sync::atomic::AtomicBool::new (true),
626 to_wake: std::sync::atomic::AtomicPtr::new (std::ptr::null_mut())
627 }
628 );
629 let sender = Sender {
630 producer: std::cell::UnsafeCell::new (producer),
631 send_new,
632 inner: inner.clone()
633 };
634 let receiver = Receiver {
635 consumer: std::cell::UnsafeCell::new (consumer),
636 receive_new,
637 steals: std::cell::UnsafeCell::new (0),
638 inner
639 };
640 (sender, receiver)
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 pub(crate) fn stress_factor() -> usize {
648 match std::env::var ("RUST_TEST_STRESS") {
649 Ok (val) => val.parse().unwrap(),
650 Err (..) => 1,
651 }
652 }
653
654 #[test]
655 fn smoke() {
656 let (tx, rx) = channel::<i32>();
657 tx.send (1).unwrap();
658 assert_eq!(rx.recv().unwrap(), 1);
659 }
660
661 #[test]
662 fn drop_full() {
663 let (tx, _rx) = channel::<Box <isize>>();
664 tx.send(Box::new (1)).unwrap();
665 }
666
667 #[test]
669 fn smoke_threads() {
670 let (tx, rx) = channel::<i32>();
671 let _t = std::thread::spawn (move|| {
672 println!("smoke threads sending...");
673 tx.send (1).unwrap();
674 });
675 println!("smoke threads receiving...");
676 assert_eq!(rx.recv().unwrap(), 1);
677 }
678
679 #[test]
680 fn smoke_port_gone() {
681 let (tx, rx) = channel::<i32>();
682 drop (rx);
683 assert!(tx.send (1).is_err());
684 }
685
686 #[test]
687 fn smoke_shared_port_gone() {
688 let (tx, rx) = channel::<i32>();
689 drop (rx);
690 assert!(tx.send (1).is_err())
691 }
692
693 #[test]
694 fn port_gone_concurrent() {
695 let (tx, rx) = channel::<i32>();
696 let _t = std::thread::spawn (move|| {
697 rx.recv().unwrap();
698 });
699 while tx.send (1).is_ok() {}
700 }
701
702 #[test]
703 fn smoke_chan_gone() {
704 let (tx, rx) = channel::<i32>();
705 drop (tx);
706 rx.recv().unwrap_err();
707 }
708
709 #[test]
710 fn chan_gone_concurrent() {
711 let (tx, rx) = channel::<i32>();
712 let _t = std::thread::spawn (move|| {
713 tx.send (1).unwrap();
714 tx.send (1).unwrap();
715 });
716 while rx.recv().is_ok() {}
717 }
718
719 #[test]
720 fn stress() {
721 let (tx, rx) = channel::<i32>();
722 let t = std::thread::spawn (move|| {
723 for _ in 0..10000 { tx.send (1).unwrap(); }
724 });
725 for _ in 0..10000 {
726 assert_eq!(rx.recv().unwrap(), 1);
727 }
728 t.join().ok().unwrap();
729 }
730
731 #[test]
732 fn send_from_outside_runtime() {
733 let (tx1, rx1) = channel::<bool>();
734 let (tx2, rx2) = channel::<i32>();
735 let t1 = std::thread::spawn (move|| {
736 tx1.send (true).unwrap();
737 for _ in 0..40 {
738 assert_eq!(rx2.recv().unwrap(), 1);
739 }
740 });
741 rx1.recv().unwrap();
742 let t2 = std::thread::spawn (move|| {
743 for _ in 0..40 {
744 tx2.send (1).unwrap();
745 }
746 });
747 t1.join().ok().unwrap();
748 t2.join().ok().unwrap();
749 }
750
751 #[test]
752 fn recv_from_outside_runtime() {
753 let (tx, rx) = channel::<i32>();
754 let t = std::thread::spawn (move|| {
755 for _ in 0..40 {
756 assert_eq!(rx.recv().unwrap(), 1);
757 }
758 });
759 for _ in 0..40 {
760 tx.send (1).unwrap();
761 }
762 t.join().ok().unwrap();
763 }
764
765 #[test]
766 fn no_runtime() {
767 let (tx1, rx1) = channel::<i32>();
768 let (tx2, rx2) = channel::<i32>();
769 let t1 = std::thread::spawn (move|| {
770 assert_eq!(rx1.recv().unwrap(), 1);
771 tx2.send (2).unwrap();
772 });
773 let t2 = std::thread::spawn (move|| {
774 tx1.send (1).unwrap();
775 assert_eq!(rx2.recv().unwrap(), 2);
776 });
777 t1.join().ok().unwrap();
778 t2.join().ok().unwrap();
779 }
780
781 #[test]
782 fn oneshot_single_thread_close_port_first() {
783 let (_tx, rx) = channel::<i32>();
785 drop (rx);
786 }
787
788 #[test]
789 fn oneshot_single_thread_close_chan_first() {
790 let (tx, _rx) = channel::<i32>();
792 drop (tx);
793 }
794
795 #[test]
796 fn oneshot_single_thread_send_port_close() {
797 let (tx, rx) = channel::<Box <i32>>();
799 drop (rx);
800 assert!(tx.send (Box::new (0)).is_err());
801 }
802
803 #[test]
804 fn oneshot_single_thread_recv_chan_close() {
805 let res = std::thread::spawn (move|| {
807 let (tx, rx) = channel::<i32>();
808 drop (tx);
809 rx.recv().unwrap();
810 }).join();
811 assert!(res.is_err());
813 }
814
815 #[test]
816 fn oneshot_single_thread_send_then_recv() {
817 let (tx, rx) = channel::<Box <i32>>();
818 tx.send (Box::new (10)).unwrap();
819 assert!(*rx.recv().unwrap() == 10);
820 }
821
822 #[test]
823 fn oneshot_single_thread_try_send_open() {
824 let (tx, rx) = channel::<i32>();
825 tx.send (10).unwrap();
826 assert!(rx.recv().unwrap() == 10);
827 }
828
829 #[test]
830 fn oneshot_single_thread_try_send_closed() {
831 let (tx, rx) = channel::<i32>();
832 drop (rx);
833 assert!(tx.send (10).is_err());
834 }
835
836 #[test]
837 fn oneshot_single_thread_try_recv_open() {
838 let (tx, rx) = channel::<i32>();
839 tx.send (10).unwrap();
840 assert!(rx.recv() == Ok (10));
841 }
842
843 #[test]
844 fn oneshot_single_thread_try_recv_closed() {
845 let (tx, rx) = channel::<i32>();
846 drop (tx);
847 rx.recv().unwrap_err();
848 }
849
850 #[test]
851 fn oneshot_single_thread_peek_data() {
852 let (tx, rx) = channel::<i32>();
853 assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
854 tx.send (10).unwrap();
855 assert_eq!(rx.try_recv(), Ok (10));
856 }
857
858 #[test]
859 fn oneshot_single_thread_peek_close() {
860 let (tx, rx) = channel::<i32>();
861 drop (tx);
862 assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
863 assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
864 }
865
866 #[test]
867 fn oneshot_single_thread_peek_open() {
868 let (_tx, rx) = channel::<i32>();
869 assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
870 }
871
872 #[test]
873 fn oneshot_multi_task_recv_then_send () {
874 let (tx, rx) = channel::<Box <i32>>();
875 let _t = std::thread::spawn (move|| {
876 assert!(*rx.recv().unwrap() == 10);
877 });
878
879 tx.send (Box::new (10)).unwrap();
880 }
881
882 #[test]
883 fn oneshot_multi_task_recv_then_close() {
884 let (tx, rx) = channel::<Box <i32>>();
885 let _t = std::thread::spawn (move|| {
886 drop (tx);
887 });
888 let res = std::thread::spawn (move|| {
889 assert!(*rx.recv().unwrap() == 10);
890 }).join();
891 assert!(res.is_err());
892 }
893
894 #[test]
895 fn oneshot_multi_thread_close_stress() {
896 for _ in 0..stress_factor() {
897 let (tx, rx) = channel::<i32>();
898 let _t = std::thread::spawn (move|| {
899 drop (rx);
900 });
901 drop (tx);
902 }
903 }
904
905 #[test]
906 fn oneshot_multi_thread_send_close_stress() {
907 for _ in 0..stress_factor() {
908 let (tx, rx) = channel::<i32>();
909 let _t = std::thread::spawn (move|| {
910 drop (rx);
911 });
912 let _ = std::thread::spawn (move|| {
913 tx.send (1).unwrap();
914 }).join();
915 }
916 }
917
918 #[test]
919 fn oneshot_multi_thread_recv_close_stress() {
920 for _ in 0..stress_factor() {
921 let (tx, rx) = channel::<i32>();
922 std::thread::spawn (move|| {
923 let res = std::thread::spawn (move|| {
924 rx.recv().unwrap();
925 }).join();
926 assert!(res.is_err());
927 });
928 let _t = std::thread::spawn (move|| {
929 std::thread::spawn (move|| {
930 drop (tx);
931 });
932 });
933 }
934 }
935
936 #[test]
937 fn oneshot_multi_thread_send_recv_stress() {
938 for _ in 0..stress_factor() {
939 let (tx, rx) = channel::<Box <isize>>();
940 let _t = std::thread::spawn (move|| {
941 tx.send (Box::new (10)).unwrap();
942 });
943 assert!(*rx.recv().unwrap() == 10);
944 }
945 }
946
947 #[test]
948 fn stream_send_recv_stress() {
949 for _ in 0..stress_factor() {
950 let (tx, rx) = channel();
951
952 send (tx, 0);
953 recv (rx, 0);
954
955 fn send (tx: Sender<Box <i32>>, i: i32) {
956 if i == 10 { return }
957
958 std::thread::spawn (move|| {
959 tx.send (Box::new (i)).unwrap();
960 send (tx, i + 1);
961 });
962 }
963
964 fn recv (rx: Receiver<Box <i32>>, i: i32) {
965 if i == 10 { return }
966
967 std::thread::spawn (move|| {
968 assert!(*rx.recv().unwrap() == i);
969 recv (rx, i + 1);
970 });
971 }
972 }
973 }
974
975 #[test]
976 fn oneshot_single_thread_recv_timeout() {
977 let (tx, rx) = channel();
978 tx.send (true).unwrap();
979 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
980 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)),
981 Err (RecvTimeoutError::Timeout));
982 tx.send (true).unwrap();
983 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
984 }
985
986 #[test]
987 fn stress_recv_timeout_two_threads() {
988 let (tx, rx) = channel();
989 let stress = stress_factor() + 100;
990 let timeout = std::time::Duration::from_millis (100);
991
992 std::thread::spawn (move || {
993 for i in 0..stress {
994 if i % 2 == 0 {
995 std::thread::sleep (timeout * 2);
996 }
997 tx.send (1usize).unwrap();
998 }
999 });
1000
1001 let mut recv_count = 0;
1002 loop {
1003 match rx.recv_timeout (timeout) {
1004 Ok (n) => {
1005 assert_eq!(n, 1usize);
1006 recv_count += 1;
1007 }
1008 Err (RecvTimeoutError::Timeout) => { }
1009 Err (RecvTimeoutError::Disconnected) => break
1010 }
1011 }
1012
1013 assert_eq!(recv_count, stress);
1014 }
1015
1016 #[test]
1017 fn recv_a_lot() {
1018 let (tx, rx) = channel();
1020 for _ in 0..10000 { tx.send (true).unwrap(); }
1021 for _ in 0..10000 { rx.recv().unwrap(); }
1022 }
1023
1024 #[test]
1025 fn nested_recv_iter() {
1026 let (tx, rx) = channel::<i32>();
1027 let (total_tx, total_rx) = channel::<i32>();
1028
1029 let _t = std::thread::spawn (move|| {
1030 let mut acc = 0;
1031 for x in rx.iter() {
1032 acc += x;
1033 }
1034 total_tx.send (acc).unwrap();
1035 });
1036
1037 tx.send (3).unwrap();
1038 tx.send (1).unwrap();
1039 tx.send (2).unwrap();
1040 drop (tx);
1041 assert_eq!(total_rx.recv().unwrap(), 6);
1042 }
1043
1044 #[test]
1045 fn recv_iter_break() {
1046 let (tx, rx) = channel::<i32>();
1047 let (count_tx, count_rx) = channel();
1048
1049 let _t = std::thread::spawn (move|| {
1050 let mut count = 0;
1051 for x in rx.iter() {
1052 if count >= 3 {
1053 break;
1054 } else {
1055 count += x;
1056 }
1057 }
1058 count_tx.send (count).unwrap();
1059 });
1060
1061 tx.send (2).unwrap();
1062 tx.send (2).unwrap();
1063 tx.send (2).unwrap();
1064 let _ = tx.send (2);
1065 drop (tx);
1066 assert_eq!(count_rx.recv().unwrap(), 4);
1067 }
1068
1069 #[test]
1070 fn recv_try_iter() {
1071 let (request_tx, request_rx) = channel();
1072 let (response_tx, response_rx) = channel();
1073
1074 let t = std::thread::spawn (move|| {
1076 let mut count = 0;
1077 loop {
1078 for x in response_rx.try_iter() {
1079 count += x;
1080 if count == 6 {
1081 return count;
1082 }
1083 }
1084 println!("test recv try iter send request...");
1085 request_tx.send (true).unwrap();
1086 }
1087 });
1088
1089 for _ in request_rx.iter() {
1090 println!("test recv try iter send response...");
1091 if response_tx.send (2).is_err() {
1092 break;
1093 }
1094 }
1095
1096 println!("test recv try iter join...");
1097
1098 assert_eq!(t.join().unwrap(), 6);
1099 }
1100
1101 #[test]
1102 fn recv_into_iter_owned() {
1103 let mut iter = {
1104 let (tx, rx) = channel::<i32>();
1105 tx.send (1).unwrap();
1106 tx.send (2).unwrap();
1107
1108 rx.into_iter()
1109 };
1110 assert_eq!(iter.next().unwrap(), 1);
1111 assert_eq!(iter.next().unwrap(), 2);
1112 assert!(iter.next().is_none());
1113 }
1114
1115 #[test]
1116 fn recv_into_iter_borrowed() {
1117 let (tx, rx) = channel::<i32>();
1118 tx.send (1).unwrap();
1119 tx.send (2).unwrap();
1120 drop (tx);
1121 let mut iter = (&rx).into_iter();
1122 assert_eq!(iter.next().unwrap(), 1);
1123 assert_eq!(iter.next().unwrap(), 2);
1124 assert!(iter.next().is_none());
1125 }
1126
1127 #[test]
1129 fn try_recv_states() {
1130 let (tx1, rx1) = channel::<i32>();
1131 let (tx2, rx2) = channel::<bool>();
1132 let (tx3, rx3) = channel::<bool>();
1133 let _t = std::thread::spawn (move|| {
1134 rx2.recv().unwrap();
1135 tx1.send (1).unwrap();
1136 tx3.send (true).unwrap();
1137 rx2.recv().unwrap();
1138 drop (tx1);
1139 tx3.send (true).unwrap();
1140 });
1141
1142 assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1143 tx2.send (true).unwrap();
1144 rx3.recv().unwrap();
1145 assert_eq!(rx1.try_recv(), Ok (1));
1146 assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1147 tx2.send (true).unwrap();
1148 rx3.recv().unwrap();
1149 assert_eq!(rx1.try_recv(), Err (TryRecvError::Disconnected));
1150 }
1151
1152 #[test]
1153 fn issue_32114() {
1154 let (tx, _) = channel();
1155 let _ = tx.send (123);
1156 assert_eq!(tx.send (123), Err (SendError (123)));
1157 }
1158
1159 #[test]
1160 fn zero_size() {
1161 let (tx, rx) = channel::<()>();
1162 tx.send (()).unwrap();
1163 let () = rx.recv().unwrap();
1164 }
1165
1166 #[test]
1167 fn race_disconnect_does_not_corrupt_sender_or_abort() {
1168 for _ in 0..200 {
1169 let (tx, rx) = channel::<Box<u64>>();
1170 let h = std::thread::spawn(move || {
1171 for _ in 0..10_000 {
1172 let _ = tx.send(Box::new(0xDEAD_BEEF));
1173 }
1174 });
1175 drop(rx);
1176 h.join().unwrap();
1177 }
1178 }
1179}