1#![feature(negative_impls)]
6
7use bounded_spsc_queue as spsc;
8
9use std::sync::atomic::Ordering;
10
11mod blocking;
12mod select;
13
14const DISCONNECTED : isize = isize::MIN;
15#[cfg(test)]
16const MAX_STEALS : isize = 5;
17#[cfg(not(test))]
18const MAX_STEALS : isize = 1 << 20; const INITIAL_CAPACITY : usize = 128;
20
21pub struct Receiver <T> {
22 consumer : std::cell::UnsafeCell <spsc::Consumer <T>>,
23 receive_new : std::sync::mpsc::Receiver <spsc::Consumer <T>>,
24 inner : std::sync::Arc <Inner>,
25 steals : std::cell::UnsafeCell <isize>
26}
27
28pub struct Sender <T> {
29 producer : std::cell::UnsafeCell <spsc::Producer <T>>,
30 send_new : std::sync::mpsc::Sender <spsc::Consumer <T>>,
31 inner : std::sync::Arc <Inner>
32}
33
34struct Inner {
35 counter : std::sync::atomic::AtomicIsize,
36 connected : std::sync::atomic::AtomicBool,
37 to_wake : std::sync::atomic::AtomicUsize
38}
39
40#[derive(Debug)]
41pub struct Iter <'a, T : 'a> {
42 rx : &'a Receiver <T>
43}
44
45#[derive(Debug)]
46pub struct TryIter <'a, T : 'a> {
47 rx : &'a Receiver <T>
48}
49
50#[derive(Debug)]
51pub struct IntoIter <T> {
52 rx : Receiver <T>
53}
54
55#[derive(Clone,Copy,Debug,Eq,PartialEq)]
57pub struct RecvError;
58
59#[derive(Clone,Copy,Eq,PartialEq)]
61pub struct SendError <T> (pub T);
62
63#[derive(Clone,Copy,Debug,Eq,PartialEq)]
64pub enum TryRecvError {
65 Empty,
66 Disconnected
67}
68
69#[derive(Clone,Copy,Debug,Eq,PartialEq)]
70pub enum RecvTimeoutError {
71 Timeout,
72 Disconnected
73}
74
75pub enum SelectionResult {
76 SelSuccess,
77 SelCanceled
78}
79
80impl <T> Receiver <T> {
81 #[expect(clippy::missing_panics_doc)]
86 pub fn try_recv (&self) -> Result <T, TryRecvError> {
87 match unsafe { (*self.consumer.get()).try_pop() } {
88 Some (t) => unsafe {
89 if MAX_STEALS < *self.steals.get() {
90 match self.inner.counter.swap (0, Ordering::SeqCst) {
91 DISCONNECTED => {
92 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
93 }
94 n => {
95 let m = std::cmp::min (n, *self.steals.get());
96 *self.steals.get() -= m;
97 self.bump (n - m);
98 }
99 }
100 assert!(0 <= *self.steals.get());
102 }
103 *self.steals.get() += 1;
104 Ok (t)
105 },
106 None => {
107 match self.receive_new.try_recv() {
108 Ok (new_consumer) => {
109 unsafe { *self.consumer.get() = new_consumer; }
110 self.try_recv()
111 },
112 Err (std::sync::mpsc::TryRecvError::Empty) => {
113 match self.inner.counter.load (Ordering::SeqCst) {
114 n if n != DISCONNECTED => Err (TryRecvError::Empty),
115 _ => {
116 match unsafe { (*self.consumer.get()).try_pop() } {
117 Some (t) => Ok (t),
118 None => Err (TryRecvError::Disconnected)
119 }
120 }
121 }
122 },
123 Err (std::sync::mpsc::TryRecvError::Disconnected) => {
124 Err (TryRecvError::Disconnected)
125 }
126 }
127 }
128 }
129 }
130
131 pub fn recv (&self) -> Result <T, RecvError> {
133 match self.try_recv() {
134 Err (TryRecvError::Empty) => {}
135 Err (TryRecvError::Disconnected) => return Err (RecvError),
136 Ok (t) => return Ok (t)
137 }
138 let (wait_token, signal_token) = blocking::tokens();
139 if self.decrement (signal_token).is_ok() {
140 wait_token.wait();
141 }
142 match self.try_recv() {
143 Ok (t) => unsafe {
144 *self.steals.get() -= 1;
145 Ok (t)
146 },
147 Err (TryRecvError::Empty) => unreachable!(
148 "woken thread should have found pending message"),
149 Err (TryRecvError::Disconnected) => Err (RecvError)
150 }
151 }
152
153 pub fn recv_timeout (&self, timeout : std::time::Duration)
154 -> Result <T, RecvTimeoutError>
155 {
156 match self.try_recv() {
157 Ok (t) => Ok (t),
158 Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected),
159 Err (TryRecvError::Empty)
160 => self.recv_max_until (std::time::Instant::now() + timeout)
161 }
162 }
163
164 #[expect(mismatched_lifetime_syntaxes)]
165 pub const fn iter (&self) -> Iter <T> {
166 Iter {
167 rx: self
168 }
169 }
170
171 #[expect(mismatched_lifetime_syntaxes)]
172 pub const fn try_iter (&self) -> TryIter <T> {
173 TryIter {
174 rx: self
175 }
176 }
177
178 pub fn capacity (&self) -> usize {
179 unsafe {
180 (*self.consumer.get()).capacity()
181 }
182 }
183
184 fn recv_max_until (&self, deadline : std::time::Instant)
185 -> Result <T, RecvTimeoutError>
186 {
187 loop {
188 match self.recv_deadline (deadline) {
189 result @ Err (RecvTimeoutError::Timeout) => {
190 if deadline <= std::time::Instant::now() {
191 return result
192 }
193 },
194 result => return result
195 }
196 }
197 }
198
199 fn recv_deadline (&self, deadline : std::time::Instant)
201 -> Result <T, RecvTimeoutError>
202 {
203 match self.try_recv() {
204 Err (TryRecvError::Empty) => {}
205 Err (TryRecvError::Disconnected)
206 => return Err (RecvTimeoutError::Disconnected),
207 Ok (t) => return Ok (t)
208 }
209 let (wait_token, signal_token) = blocking::tokens();
210 if self.decrement (signal_token).is_ok() {
211 let timed_out = !wait_token.wait_max_until (deadline);
212 if timed_out {
213 let _has_data = self.abort_selection_();
215 }
216 }
217 match self.try_recv() {
218 Ok (t) => unsafe {
219 *self.steals.get() -= 1;
220 Ok (t)
221 }
222 Err (TryRecvError::Empty) => Err (RecvTimeoutError::Timeout),
223 Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected)
224 }
225 }
226
227 fn decrement (&self, token : blocking::SignalToken)
228 -> Result <(), blocking::SignalToken>
229 {
230 assert_eq!(self.inner.to_wake.load (Ordering::SeqCst), 0);
231 let ptr = unsafe { token.cast_to_usize() };
232 self.inner.to_wake.store (ptr, Ordering::SeqCst);
233 let steals = unsafe { std::ptr::replace (self.steals.get(), 0) };
234 match self.inner.counter.fetch_sub (1 + steals, Ordering::SeqCst) {
235 DISCONNECTED => {
236 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
237 }
238 n => {
239 assert!(0 <= n);
240 if n - steals <= 0 {
241 return Ok (())
242 }
243 }
244 }
245 self.inner.to_wake.store (0, Ordering::SeqCst);
246 Err (unsafe { blocking::SignalToken::cast_from_usize (ptr) })
247 }
248
249 fn can_recv_ (&self) -> bool {
254 0 < unsafe { (*self.consumer.get()).size() }
255 }
256
257 fn start_selection_ (&self, token : blocking::SignalToken) -> SelectionResult {
258 match self.decrement (token) {
259 Ok (()) => SelectionResult::SelSuccess,
260 Err (_token) => {
261 let prev = self.bump (1);
263 assert!(prev == DISCONNECTED || 0 <= prev);
264 SelectionResult::SelCanceled
265 }
266 }
267 }
268
269 fn abort_selection_ (&self) -> bool {
271 let steals = 1;
272 let prev = self.bump (steals + 1);
273 if prev == DISCONNECTED {
274 assert_eq! (self.inner.to_wake.load (Ordering::SeqCst), 0);
275 true
276 } else {
277 let cur = prev + steals + 1;
278 assert!(0 <= cur);
279 if prev < 0 {
280 drop (self.inner.take_to_wake());
281 } else {
282 while self.inner.to_wake.load (Ordering::SeqCst) != 0 {
283 std::thread::yield_now();
284 }
285 }
286 unsafe {
287 assert_eq!(*self.steals.get(), 0);
288 *self.steals.get() = steals;
289 }
290 0 <= prev
291 }
292 }
293
294 fn bump (&self, amt : isize) -> isize {
295 match self.inner.counter.fetch_add (amt, Ordering::SeqCst) {
296 DISCONNECTED => {
297 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
298 DISCONNECTED
299 }
300 n => n
301 }
302 }
303
304} impl <T> std::fmt::Debug for Receiver <T> {
307 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
308 write!(f, "Receiver {{ .. }}")
309 }
310}
311
312impl <T> IntoIterator for Receiver <T> {
313 type Item = T;
314 type IntoIter = IntoIter <T>;
315 fn into_iter (self) -> IntoIter <T> {
316 IntoIter {
317 rx: self
318 }
319 }
320}
321
322impl <'a, T> IntoIterator for &'a Receiver <T> {
323 type Item = T;
324 type IntoIter = Iter <'a, T>;
325 fn into_iter (self) -> Iter <'a, T> {
326 self.iter()
327 }
328}
329
330impl <T> Drop for Receiver <T> {
331 fn drop (&mut self) {
332 self.inner.connected.store (false, Ordering::SeqCst);
333
334 let mut steals = unsafe { *self.steals.get() };
342 while {
343 let count = self.inner.counter.compare_exchange (
344 steals, DISCONNECTED, Ordering::SeqCst, Ordering::SeqCst
345 ).unwrap_or_else (|i| i);
346 count != DISCONNECTED && count != steals
347 } {
348 while let Some (_t) = unsafe { (*self.consumer.get()).try_pop() } {
349 steals += 1;
350 }
351 }
352 }
353}
354
355impl <T> Sender <T> {
356 #[expect(clippy::missing_panics_doc)]
358 pub fn send (&self, t : T) -> Result <(), SendError <T>> {
359 if self.inner.connected.load (Ordering::SeqCst) {
360 match unsafe { (*self.producer.get()).try_push (t) } {
361 None => {}, Some (t) => { let new_capacity = 2 * unsafe { (*self.producer.get()).capacity() };
364 let (new_producer, new_consumer) = spsc::make (new_capacity);
365 self.send_new.send (new_consumer).unwrap();
370 unsafe { *self.producer.get() = new_producer; }
371 match unsafe { (*self.producer.get()).try_push (t) } {
372 None => {}
373 Some (_t) => unreachable!(
374 "send on a newly created queue should always succeed")
375 }
376 }
377 }
378 match self.inner.counter.fetch_add (1, Ordering::SeqCst) {
380 -1 => {
381 self.inner.take_to_wake().signal();
382 },
383 -2 => {},
384 DISCONNECTED => {
385 self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
386 unsafe {
392 let consumer : spsc::Consumer <T>
393 = std::mem::transmute (self.producer.get());
394 let first = consumer.try_pop();
395 let second = consumer.try_pop();
396 assert!(second.is_none());
397 if let Some(t) = first {
398 return Err (SendError (t))
399 }
400 }
401 },
402 n => {
403 assert! (0 <= n);
404 }
405 }
406 Ok (())
407 } else {
408 Err (SendError (t))
409 }
410 }
411} impl <T> std::fmt::Debug for Sender <T> {
414 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
415 write!(f, "Sender {{ .. }}")
416 }
417}
418
419impl <T> Drop for Sender <T> {
420 fn drop (&mut self) {
421 self.inner.connected.store (false, Ordering::SeqCst);
422 match self.inner.counter.swap (DISCONNECTED, Ordering::SeqCst) {
423 DISCONNECTED => {}
424 -1 => {
425 self.inner.take_to_wake().signal();
426 }
427 n => {
428 assert!(0 <= n);
429 }
430 }
431 }
432}
433
434impl Inner {
435 fn take_to_wake (&self) -> blocking::SignalToken {
436 let ptr = self.to_wake.swap (0, Ordering::SeqCst);
437 assert!(ptr != 0);
438 unsafe {
439 blocking::SignalToken::cast_from_usize (ptr)
440 }
441 }
442}
443
444impl <T> Iterator for Iter <'_, T> {
445 type Item = T;
446 fn next (&mut self) -> Option <T> {
447 self.rx.recv().ok()
448 }
449}
450
451impl <T> Iterator for TryIter <'_, T> {
452 type Item = T;
453 fn next (&mut self) -> Option <T> {
454 self.rx.try_recv().ok()
455 }
456}
457
458impl <T> Iterator for IntoIter <T> {
459 type Item = T;
460 fn next (&mut self) -> Option <T> {
461 self.rx.recv().ok()
462 }
463}
464
465impl std::fmt::Display for RecvError {
466 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
467 "receiving on a closed channel".fmt (f)
468 }
469}
470
471impl std::error::Error for RecvError {
472 fn description (&self) -> &'static str {
473 "receiving on a closed channel"
474 }
475
476 fn cause (&self) -> Option <&dyn std::error::Error> {
477 None
478 }
479}
480
481impl <T> std::fmt::Debug for SendError <T> {
482 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
483 "SendError(..)".fmt (f)
484 }
485}
486
487impl <T> std::fmt::Display for SendError <T> {
488 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
489 "sending on a closed channel".fmt (f)
490 }
491}
492
493impl <T : Send> std::error::Error for SendError <T> {
494 fn description (&self) -> &'static str {
495 "sending on a closed channel"
496 }
497
498 fn cause (&self) -> Option <&dyn std::error::Error> {
499 None
500 }
501}
502
503impl std::fmt::Display for TryRecvError {
504 fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
505 match *self {
506 TryRecvError::Empty => "receiving on an empty channel".fmt (f),
507 TryRecvError::Disconnected => "receiving on a closed channel".fmt (f)
508 }
509 }
510}
511
512impl std::error::Error for TryRecvError {
513 fn description (&self) -> &str {
514 match *self {
515 TryRecvError::Empty => "receiving on an empty channel",
516 TryRecvError::Disconnected => "receiving on a closed channel"
517 }
518 }
519
520 fn cause (&self) -> Option <&dyn std::error::Error> {
521 None
522 }
523}
524
525pub fn channel <T : 'static> () -> (Sender <T>, Receiver <T>) {
526 let (producer, consumer) = spsc::make (INITIAL_CAPACITY);
527 let (send_new, receive_new) = std::sync::mpsc::channel();
528 let inner = std::sync::Arc::new (
529 Inner {
530 counter: std::sync::atomic::AtomicIsize::new (0),
531 connected: std::sync::atomic::AtomicBool::new (true),
532 to_wake: std::sync::atomic::AtomicUsize::new (0)
533 }
534 );
535 let sender = Sender {
536 producer: std::cell::UnsafeCell::new (producer),
537 send_new,
538 inner: inner.clone()
539 };
540 let receiver = Receiver {
541 consumer: std::cell::UnsafeCell::new (consumer),
542 receive_new,
543 steals: std::cell::UnsafeCell::new (0),
544 inner
545 };
546 (sender, receiver)
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 pub fn stress_factor() -> usize {
554 match std::env::var ("RUST_TEST_STRESS") {
555 Ok (val) => val.parse().unwrap(),
556 Err (..) => 1,
557 }
558 }
559
560 #[test]
561 fn smoke() {
562 let (tx, rx) = channel::<i32>();
563 tx.send (1).unwrap();
564 assert_eq!(rx.recv().unwrap(), 1);
565 }
566
567 #[test]
568 fn drop_full() {
569 let (tx, _rx) = channel::<Box <isize>>();
570 tx.send(Box::new (1)).unwrap();
571 }
572
573 #[test]
575 fn smoke_threads() {
576 let (tx, rx) = channel::<i32>();
577 let _t = std::thread::spawn (move|| {
578 println!("smoke threads sending...");
580 tx.send (1).unwrap();
581 });
582 println!("smoke threads receiving...");
584 assert_eq!(rx.recv().unwrap(), 1);
585 }
586
587 #[test]
588 fn smoke_port_gone() {
589 let (tx, rx) = channel::<i32>();
590 drop (rx);
591 assert!(tx.send (1).is_err());
592 }
593
594 #[test]
595 fn smoke_shared_port_gone() {
596 let (tx, rx) = channel::<i32>();
597 drop (rx);
598 assert!(tx.send (1).is_err())
599 }
600
601 #[test]
602 fn port_gone_concurrent() {
603 let (tx, rx) = channel::<i32>();
604 let _t = std::thread::spawn (move|| {
605 rx.recv().unwrap();
606 });
607 while tx.send (1).is_ok() {}
608 }
609
610 #[test]
611 fn smoke_chan_gone() {
612 let (tx, rx) = channel::<i32>();
613 drop (tx);
614 rx.recv().unwrap_err();
615 }
616
617 #[test]
618 fn chan_gone_concurrent() {
619 let (tx, rx) = channel::<i32>();
620 let _t = std::thread::spawn (move|| {
621 tx.send (1).unwrap();
622 tx.send (1).unwrap();
623 });
624 while rx.recv().is_ok() {}
625 }
626
627 #[test]
628 fn stress() {
629 let (tx, rx) = channel::<i32>();
630 let t = std::thread::spawn (move|| {
631 for _ in 0..10000 { tx.send (1).unwrap(); }
632 });
633 for _ in 0..10000 {
634 assert_eq!(rx.recv().unwrap(), 1);
635 }
636 t.join().ok().unwrap();
637 }
638
639 #[test]
640 fn send_from_outside_runtime() {
641 let (tx1, rx1) = channel::<bool>();
642 let (tx2, rx2) = channel::<i32>();
643 let t1 = std::thread::spawn (move|| {
644 tx1.send (true).unwrap();
645 for _ in 0..40 {
646 assert_eq!(rx2.recv().unwrap(), 1);
647 }
648 });
649 rx1.recv().unwrap();
650 let t2 = std::thread::spawn (move|| {
651 for _ in 0..40 {
652 tx2.send (1).unwrap();
653 }
654 });
655 t1.join().ok().unwrap();
656 t2.join().ok().unwrap();
657 }
658
659 #[test]
660 fn recv_from_outside_runtime() {
661 let (tx, rx) = channel::<i32>();
662 let t = std::thread::spawn (move|| {
663 for _ in 0..40 {
664 assert_eq!(rx.recv().unwrap(), 1);
665 }
666 });
667 for _ in 0..40 {
668 tx.send (1).unwrap();
669 }
670 t.join().ok().unwrap();
671 }
672
673 #[test]
674 fn no_runtime() {
675 let (tx1, rx1) = channel::<i32>();
676 let (tx2, rx2) = channel::<i32>();
677 let t1 = std::thread::spawn (move|| {
678 assert_eq!(rx1.recv().unwrap(), 1);
679 tx2.send (2).unwrap();
680 });
681 let t2 = std::thread::spawn (move|| {
682 tx1.send (1).unwrap();
683 assert_eq!(rx2.recv().unwrap(), 2);
684 });
685 t1.join().ok().unwrap();
686 t2.join().ok().unwrap();
687 }
688
689 #[test]
690 fn oneshot_single_thread_close_port_first() {
691 let (_tx, rx) = channel::<i32>();
693 drop (rx);
694 }
695
696 #[test]
697 fn oneshot_single_thread_close_chan_first() {
698 let (tx, _rx) = channel::<i32>();
700 drop (tx);
701 }
702
703 #[test]
704 fn oneshot_single_thread_send_port_close() {
705 let (tx, rx) = channel::<Box <i32>>();
707 drop (rx);
708 assert!(tx.send (Box::new (0)).is_err());
709 }
710
711 #[test]
712 fn oneshot_single_thread_recv_chan_close() {
713 let res = std::thread::spawn (move|| {
715 let (tx, rx) = channel::<i32>();
716 drop (tx);
717 rx.recv().unwrap();
718 }).join();
719 assert!(res.is_err());
721 }
722
723 #[test]
724 fn oneshot_single_thread_send_then_recv() {
725 let (tx, rx) = channel::<Box <i32>>();
726 tx.send (Box::new (10)).unwrap();
727 assert!(*rx.recv().unwrap() == 10);
728 }
729
730 #[test]
731 fn oneshot_single_thread_try_send_open() {
732 let (tx, rx) = channel::<i32>();
733 tx.send (10).unwrap();
734 assert!(rx.recv().unwrap() == 10);
735 }
736
737 #[test]
738 fn oneshot_single_thread_try_send_closed() {
739 let (tx, rx) = channel::<i32>();
740 drop (rx);
741 assert!(tx.send (10).is_err());
742 }
743
744 #[test]
745 fn oneshot_single_thread_try_recv_open() {
746 let (tx, rx) = channel::<i32>();
747 tx.send (10).unwrap();
748 assert!(rx.recv() == Ok (10));
749 }
750
751 #[test]
752 fn oneshot_single_thread_try_recv_closed() {
753 let (tx, rx) = channel::<i32>();
754 drop (tx);
755 rx.recv().unwrap_err();
756 }
757
758 #[test]
759 fn oneshot_single_thread_peek_data() {
760 let (tx, rx) = channel::<i32>();
761 assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
762 tx.send (10).unwrap();
763 assert_eq!(rx.try_recv(), Ok (10));
764 }
765
766 #[test]
767 fn oneshot_single_thread_peek_close() {
768 let (tx, rx) = channel::<i32>();
769 drop (tx);
770 assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
771 assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
772 }
773
774 #[test]
775 fn oneshot_single_thread_peek_open() {
776 let (_tx, rx) = channel::<i32>();
777 assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
778 }
779
780 #[test]
781 fn oneshot_multi_task_recv_then_send () {
782 let (tx, rx) = channel::<Box <i32>>();
783 let _t = std::thread::spawn (move|| {
784 assert!(*rx.recv().unwrap() == 10);
785 });
786
787 tx.send (Box::new (10)).unwrap();
788 }
789
790 #[test]
791 fn oneshot_multi_task_recv_then_close() {
792 let (tx, rx) = channel::<Box <i32>>();
793 let _t = std::thread::spawn (move|| {
794 drop (tx);
795 });
796 let res = std::thread::spawn (move|| {
797 assert!(*rx.recv().unwrap() == 10);
798 }).join();
799 assert!(res.is_err());
800 }
801
802 #[test]
803 fn oneshot_multi_thread_close_stress() {
804 for _ in 0..stress_factor() {
805 let (tx, rx) = channel::<i32>();
806 let _t = std::thread::spawn (move|| {
807 drop (rx);
808 });
809 drop (tx);
810 }
811 }
812
813 #[test]
814 fn oneshot_multi_thread_send_close_stress() {
815 for _ in 0..stress_factor() {
816 let (tx, rx) = channel::<i32>();
817 let _t = std::thread::spawn (move|| {
818 drop (rx);
819 });
820 let _ = std::thread::spawn (move|| {
821 tx.send (1).unwrap();
822 }).join();
823 }
824 }
825
826 #[test]
827 fn oneshot_multi_thread_recv_close_stress() {
828 for _ in 0..stress_factor() {
829 let (tx, rx) = channel::<i32>();
830 std::thread::spawn (move|| {
831 let res = std::thread::spawn (move|| {
832 rx.recv().unwrap();
833 }).join();
834 assert!(res.is_err());
835 });
836 let _t = std::thread::spawn (move|| {
837 std::thread::spawn (move|| {
838 drop (tx);
839 });
840 });
841 }
842 }
843
844 #[test]
845 fn oneshot_multi_thread_send_recv_stress() {
846 for _ in 0..stress_factor() {
847 let (tx, rx) = channel::<Box <isize>>();
848 let _t = std::thread::spawn (move|| {
849 tx.send (Box::new (10)).unwrap();
850 });
851 assert!(*rx.recv().unwrap() == 10);
852 }
853 }
854
855 #[test]
856 fn stream_send_recv_stress() {
857 for _ in 0..stress_factor() {
858 let (tx, rx) = channel();
859
860 send (tx, 0);
861 recv (rx, 0);
862
863 fn send (tx: Sender<Box <i32>>, i: i32) {
864 if i == 10 { return }
865
866 std::thread::spawn (move|| {
867 tx.send (Box::new (i)).unwrap();
868 send (tx, i + 1);
869 });
870 }
871
872 fn recv (rx: Receiver<Box <i32>>, i: i32) {
873 if i == 10 { return }
874
875 std::thread::spawn (move|| {
876 assert!(*rx.recv().unwrap() == i);
877 recv (rx, i + 1);
878 });
879 }
880 }
881 }
882
883 #[test]
884 fn oneshot_single_thread_recv_timeout() {
885 let (tx, rx) = channel();
886 tx.send (true).unwrap();
887 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
888 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)),
889 Err (RecvTimeoutError::Timeout));
890 tx.send (true).unwrap();
891 assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
892 }
893
894 #[test]
895 fn stress_recv_timeout_two_threads() {
896 let (tx, rx) = channel();
897 let stress = stress_factor() + 100;
898 let timeout = std::time::Duration::from_millis (100);
899
900 std::thread::spawn (move || {
901 for i in 0..stress {
902 if i % 2 == 0 {
903 std::thread::sleep (timeout * 2);
904 }
905 tx.send (1usize).unwrap();
906 }
907 });
908
909 let mut recv_count = 0;
910 loop {
911 match rx.recv_timeout (timeout) {
912 Ok (n) => {
913 assert_eq!(n, 1usize);
914 recv_count += 1;
915 }
916 Err (RecvTimeoutError::Timeout) => { }
917 Err (RecvTimeoutError::Disconnected) => break
918 }
919 }
920
921 assert_eq!(recv_count, stress);
922 }
923
924 #[test]
925 fn recv_a_lot() {
926 let (tx, rx) = channel();
928 for _ in 0..10000 { tx.send (true).unwrap(); }
929 for _ in 0..10000 { rx.recv().unwrap(); }
930 }
931
932 #[test]
933 fn nested_recv_iter() {
934 let (tx, rx) = channel::<i32>();
935 let (total_tx, total_rx) = channel::<i32>();
936
937 let _t = std::thread::spawn (move|| {
938 let mut acc = 0;
939 for x in rx.iter() {
940 acc += x;
941 }
942 total_tx.send (acc).unwrap();
943 });
944
945 tx.send (3).unwrap();
946 tx.send (1).unwrap();
947 tx.send (2).unwrap();
948 drop (tx);
949 assert_eq!(total_rx.recv().unwrap(), 6);
950 }
951
952 #[test]
953 fn recv_iter_break() {
954 let (tx, rx) = channel::<i32>();
955 let (count_tx, count_rx) = channel();
956
957 let _t = std::thread::spawn (move|| {
958 let mut count = 0;
959 for x in rx.iter() {
960 if count >= 3 {
961 break;
962 } else {
963 count += x;
964 }
965 }
966 count_tx.send (count).unwrap();
967 });
968
969 tx.send (2).unwrap();
970 tx.send (2).unwrap();
971 tx.send (2).unwrap();
972 let _ = tx.send (2);
973 drop (tx);
974 assert_eq!(count_rx.recv().unwrap(), 4);
975 }
976
977 #[test]
982 fn recv_try_iter() {
983 let (request_tx, request_rx) = channel();
984 let (response_tx, response_rx) = channel();
985
986 let t = std::thread::spawn (move|| {
988 let mut count = 0;
989 loop {
990 for x in response_rx.try_iter() {
991 count += x;
992 if count == 6 {
993 return count;
994 }
995 }
996 println!("test recv try iter send request...");
998 request_tx.send (true).unwrap();
999 }
1000 });
1001
1002 for _ in request_rx.iter() {
1003 println!("test recv try iter send response...");
1005 if response_tx.send (2).is_err() {
1006 break;
1007 }
1008 }
1009
1010 println!("test recv try iter join...");
1012
1013 assert_eq!(t.join().unwrap(), 6);
1014 }
1015
1016 #[test]
1017 fn recv_into_iter_owned() {
1018 let mut iter = {
1019 let (tx, rx) = channel::<i32>();
1020 tx.send (1).unwrap();
1021 tx.send (2).unwrap();
1022
1023 rx.into_iter()
1024 };
1025 assert_eq!(iter.next().unwrap(), 1);
1026 assert_eq!(iter.next().unwrap(), 2);
1027 assert!(iter.next().is_none());
1028 }
1029
1030 #[test]
1031 fn recv_into_iter_borrowed() {
1032 let (tx, rx) = channel::<i32>();
1033 tx.send (1).unwrap();
1034 tx.send (2).unwrap();
1035 drop (tx);
1036 let mut iter = (&rx).into_iter();
1037 assert_eq!(iter.next().unwrap(), 1);
1038 assert_eq!(iter.next().unwrap(), 2);
1039 assert!(iter.next().is_none());
1040 }
1041
1042 #[test]
1044 fn try_recv_states() {
1045 let (tx1, rx1) = channel::<i32>();
1046 let (tx2, rx2) = channel::<bool>();
1047 let (tx3, rx3) = channel::<bool>();
1048 let _t = std::thread::spawn (move|| {
1049 rx2.recv().unwrap();
1050 tx1.send (1).unwrap();
1051 tx3.send (true).unwrap();
1052 rx2.recv().unwrap();
1053 drop (tx1);
1054 tx3.send (true).unwrap();
1055 });
1056
1057 assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1058 tx2.send (true).unwrap();
1059 rx3.recv().unwrap();
1060 assert_eq!(rx1.try_recv(), Ok (1));
1061 assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1062 tx2.send (true).unwrap();
1063 rx3.recv().unwrap();
1064 assert_eq!(rx1.try_recv(), Err (TryRecvError::Disconnected));
1065 }
1066
1067 #[test]
1068 fn issue_32114() {
1069 let (tx, _) = channel();
1070 let _ = tx.send (123);
1071 assert_eq!(tx.send (123), Err (SendError (123)));
1072 }
1073
1074 #[test]
1075 fn zero_size() {
1076 let (tx, rx) = channel::<()>();
1077 tx.send (()).unwrap();
1078 let () = rx.recv().unwrap();
1079 }
1080}