unbounded_spsc/
lib.rs

1//! This library adapts the block-waiting `recv` mechanism from the Rust
2//! standard library to an unbounded SPSC channel backed by
3//! `spsc`.
4
5#![feature(negative_impls)]
6
7use bounded_spsc_queue as spsc;
8
9use std::sync::atomic::Ordering;
10
11mod blocking;
12mod select;
13
14const DISCONNECTED     : isize = isize::MIN;
15#[cfg(test)]
16const MAX_STEALS       : isize = 5;
17#[cfg(not(test))]
18const MAX_STEALS       : isize = 1 << 20;   // ~1 million
19const INITIAL_CAPACITY : usize = 128;
20
21pub struct Receiver <T> {
22  consumer    : std::cell::UnsafeCell <spsc::Consumer <T>>,
23  receive_new : std::sync::mpsc::Receiver <spsc::Consumer <T>>,
24  inner       : std::sync::Arc <Inner>,
25  steals      : std::cell::UnsafeCell <isize>
26}
27
28pub struct Sender <T> {
29  producer : std::cell::UnsafeCell <spsc::Producer <T>>,
30  send_new : std::sync::mpsc::Sender <spsc::Consumer <T>>,
31  inner    : std::sync::Arc <Inner>
32}
33
34struct Inner {
35  counter   : std::sync::atomic::AtomicIsize,
36  connected : std::sync::atomic::AtomicBool,
37  to_wake   : std::sync::atomic::AtomicUsize
38}
39
40#[derive(Debug)]
41pub struct Iter <'a, T : 'a> {
42  rx : &'a Receiver <T>
43}
44
45#[derive(Debug)]
46pub struct TryIter <'a, T : 'a> {
47  rx : &'a Receiver <T>
48}
49
50#[derive(Debug)]
51pub struct IntoIter <T> {
52  rx : Receiver <T>
53}
54
55/// Sender disconnected, no further messages will ever be received.
56#[derive(Clone,Copy,Debug,Eq,PartialEq)]
57pub struct RecvError;
58
59/// Receiver disconnected, message will never be deliverable.
60#[derive(Clone,Copy,Eq,PartialEq)]
61pub struct SendError <T> (pub T);
62
63#[derive(Clone,Copy,Debug,Eq,PartialEq)]
64pub enum TryRecvError {
65  Empty,
66  Disconnected
67}
68
69#[derive(Clone,Copy,Debug,Eq,PartialEq)]
70pub enum RecvTimeoutError {
71  Timeout,
72  Disconnected
73}
74
75pub enum SelectionResult {
76  SelSuccess,
77  SelCanceled
78}
79
80impl <T> Receiver <T> {
81  /// Non-blocking receive, returns `Err(TryRecvError::Empty)` if buffer was empty; will
82  /// continue to receive pending messages from a disconnected channel until it is
83  /// empty, at which point further calls to this function will return
84  /// `Err(TryRecvError::Disconnected)`.
85  #[expect(clippy::missing_panics_doc)]
86  pub fn try_recv (&self) -> Result <T, TryRecvError> {
87    match unsafe { (*self.consumer.get()).try_pop() } {
88      Some (t) => unsafe {
89        if MAX_STEALS < *self.steals.get() {
90          match self.inner.counter.swap (0, Ordering::SeqCst) {
91            DISCONNECTED => {
92              self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
93            }
94            n => {
95              let m = std::cmp::min (n, *self.steals.get());
96              *self.steals.get() -= m;
97              self.bump (n - m);
98            }
99          }
100          // TODO: can this be changed to a debug assertion ?
101          assert!(0 <= *self.steals.get());
102        }
103        *self.steals.get() += 1;
104        Ok (t)
105      },
106      None => {
107        match self.receive_new.try_recv() {
108          Ok (new_consumer) => {
109            unsafe { *self.consumer.get() = new_consumer; }
110            self.try_recv()
111          },
112          Err (std::sync::mpsc::TryRecvError::Empty) => {
113            match self.inner.counter.load (Ordering::SeqCst) {
114              n if n != DISCONNECTED => Err (TryRecvError::Empty),
115              _ => {
116                match unsafe { (*self.consumer.get()).try_pop() } {
117                  Some (t) => Ok (t),
118                  None     => Err (TryRecvError::Disconnected)
119                }
120              }
121            }
122          },
123          Err (std::sync::mpsc::TryRecvError::Disconnected) => {
124            Err (TryRecvError::Disconnected)
125          }
126        }
127      }
128    }
129  }
130
131  /// Block waiting if no messages are pending in the buffer.
132  pub fn recv (&self) -> Result <T, RecvError> {
133    match self.try_recv() {
134      Err (TryRecvError::Empty) => {}
135      Err (TryRecvError::Disconnected) => return Err (RecvError),
136      Ok  (t) => return Ok (t)
137    }
138    let (wait_token, signal_token) = blocking::tokens();
139    if self.decrement (signal_token).is_ok() {
140      wait_token.wait();
141    }
142    match self.try_recv() {
143      Ok (t) => unsafe {
144        *self.steals.get() -= 1;
145        Ok (t)
146      },
147      Err (TryRecvError::Empty) => unreachable!(
148        "woken thread should have found pending message"),
149      Err (TryRecvError::Disconnected) => Err (RecvError)
150    }
151  }
152
153  pub fn recv_timeout (&self, timeout : std::time::Duration)
154    -> Result <T, RecvTimeoutError>
155  {
156    match self.try_recv() {
157      Ok  (t)                          => Ok (t),
158      Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected),
159      Err (TryRecvError::Empty)
160        => self.recv_max_until (std::time::Instant::now() + timeout)
161    }
162  }
163
164  #[expect(mismatched_lifetime_syntaxes)]
165  pub const fn iter (&self) -> Iter <T> {
166    Iter {
167      rx: self
168    }
169  }
170
171  #[expect(mismatched_lifetime_syntaxes)]
172  pub const fn try_iter (&self) -> TryIter <T> {
173    TryIter {
174      rx: self
175    }
176  }
177
178  pub fn capacity (&self) -> usize {
179    unsafe {
180      (*self.consumer.get()).capacity()
181    }
182  }
183
184  fn recv_max_until (&self, deadline : std::time::Instant)
185    -> Result <T, RecvTimeoutError>
186  {
187    loop {
188      match self.recv_deadline (deadline) {
189        result @ Err (RecvTimeoutError::Timeout) => {
190          if deadline <= std::time::Instant::now() {
191            return result
192          }
193        },
194        result => return result
195      }
196    }
197  }
198
199  /// This is the same as `recv` except with code for timeout.
200  fn recv_deadline (&self, deadline : std::time::Instant)
201    -> Result <T, RecvTimeoutError>
202  {
203    match self.try_recv() {
204      Err (TryRecvError::Empty) => {}
205      Err (TryRecvError::Disconnected)
206        => return Err (RecvTimeoutError::Disconnected),
207      Ok  (t) => return Ok (t)
208    }
209    let (wait_token, signal_token) = blocking::tokens();
210    if self.decrement (signal_token).is_ok() {
211      let timed_out = !wait_token.wait_max_until (deadline);
212      if timed_out {
213        // this boolean result is not used: `try_recv` is always called below
214        let _has_data = self.abort_selection_();
215      }
216    }
217    match self.try_recv() {
218      Ok (t) => unsafe {
219        *self.steals.get() -= 1;
220        Ok (t)
221      }
222      Err (TryRecvError::Empty)        => Err (RecvTimeoutError::Timeout),
223      Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected)
224    }
225  }
226
227  fn decrement (&self, token : blocking::SignalToken)
228    -> Result <(), blocking::SignalToken>
229  {
230    assert_eq!(self.inner.to_wake.load (Ordering::SeqCst), 0);
231    let ptr = unsafe { token.cast_to_usize() };
232    self.inner.to_wake.store (ptr, Ordering::SeqCst);
233    let steals = unsafe { std::ptr::replace (self.steals.get(), 0) };
234    match self.inner.counter.fetch_sub (1 + steals, Ordering::SeqCst) {
235      DISCONNECTED => {
236        self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
237      }
238      n => {
239        assert!(0 <= n);
240        if n - steals <= 0 {
241          return Ok (())
242        }
243      }
244    }
245    self.inner.to_wake.store (0, Ordering::SeqCst);
246    Err (unsafe { blocking::SignalToken::cast_from_usize (ptr) })
247  }
248
249  /////////////////////////////////////////////////////////////////////////////
250  //  select functions
251  /////////////////////////////////////////////////////////////////////////////
252
253  fn can_recv_ (&self) -> bool {
254    0 < unsafe { (*self.consumer.get()).size() }
255  }
256
257  fn start_selection_ (&self, token : blocking::SignalToken) -> SelectionResult {
258    match self.decrement (token) {
259      Ok  (()) => SelectionResult::SelSuccess,
260      Err (_token) => {
261        // undo decrement above
262        let prev = self.bump (1);
263        assert!(prev == DISCONNECTED || 0 <= prev);
264        SelectionResult::SelCanceled
265      }
266    }
267  }
268
269  /// Returns true if receiver has data pending.
270  fn abort_selection_ (&self) -> bool {
271    let steals = 1;
272    let prev = self.bump (steals + 1);
273    if prev == DISCONNECTED {
274      assert_eq! (self.inner.to_wake.load (Ordering::SeqCst), 0);
275      true
276    } else {
277      let cur = prev + steals + 1;
278      assert!(0 <= cur);
279      if prev < 0 {
280        drop (self.inner.take_to_wake());
281      } else {
282        while self.inner.to_wake.load (Ordering::SeqCst) != 0 {
283          std::thread::yield_now();
284        }
285      }
286      unsafe {
287        assert_eq!(*self.steals.get(), 0);
288        *self.steals.get() = steals;
289      }
290      0 <= prev
291    }
292  }
293
294  fn bump (&self, amt : isize) -> isize {
295    match self.inner.counter.fetch_add (amt, Ordering::SeqCst) {
296      DISCONNECTED => {
297        self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
298        DISCONNECTED
299      }
300      n => n
301    }
302  }
303
304} // end impl Receiver
305
306impl <T> std::fmt::Debug for Receiver <T> {
307  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
308    write!(f, "Receiver {{ .. }}")
309  }
310}
311
312impl <T> IntoIterator for Receiver <T> {
313  type Item = T;
314  type IntoIter = IntoIter <T>;
315  fn into_iter (self) -> IntoIter <T> {
316    IntoIter {
317      rx: self
318    }
319  }
320}
321
322impl <'a, T> IntoIterator for &'a Receiver <T> {
323  type Item = T;
324  type IntoIter = Iter <'a, T>;
325  fn into_iter (self) -> Iter <'a, T> {
326    self.iter()
327  }
328}
329
330impl <T> Drop for Receiver <T> {
331  fn drop (&mut self) {
332    self.inner.connected.store (false, Ordering::SeqCst);
333
334    // NOTE: The following code to clear the queue comes from the original
335    // standard library MPSC stream-flavor drop function. Whether it is
336    // necessary because of the linked-list queue used in that case, or rather
337    // it is needed to ensure the synchronization with the sender is not known.
338    // Besides the one-time overhead it shouldn't hurt so we will do it
339    // regardless.
340    // TODO: find out if this is required or we can just drop the queue
341    let mut steals = unsafe { *self.steals.get() };
342    while {
343      let count = self.inner.counter.compare_exchange (
344        steals, DISCONNECTED, Ordering::SeqCst, Ordering::SeqCst
345      ).unwrap_or_else (|i| i);
346      count != DISCONNECTED && count != steals
347    } {
348      while let Some (_t) = unsafe { (*self.consumer.get()).try_pop() } {
349        steals += 1;
350      }
351    }
352  }
353}
354
355impl <T> Sender <T> {
356  /// Non-blocking send.
357  #[expect(clippy::missing_panics_doc)]
358  pub fn send (&self, t : T) -> Result <(), SendError <T>> {
359    if self.inner.connected.load (Ordering::SeqCst) {
360      match unsafe { (*self.producer.get()).try_push (t) } {
361        None     => {}, // success
362        Some (t) => {   // queue full
363          let new_capacity = 2 * unsafe { (*self.producer.get()).capacity() };
364          let (new_producer, new_consumer) = spsc::make (new_capacity);
365          // TODO: We are using a side channel here to send the new consumer
366          // which was not part of the original standard library channel
367          // implementation. Are we sure that this is safe to unwrap or should
368          // we handle the result explicitly ?
369          self.send_new.send (new_consumer).unwrap();
370          unsafe { *self.producer.get() = new_producer; }
371          match unsafe { (*self.producer.get()).try_push (t) } {
372            None      => {}
373            Some (_t) => unreachable!(
374              "send on a newly created queue should always succeed")
375          }
376        }
377      }
378      // TODO: can we replace asserts with debug assertions ?
379      match self.inner.counter.fetch_add (1, Ordering::SeqCst) {
380        -1 => {
381          self.inner.take_to_wake().signal();
382        },
383        -2 => {},
384        DISCONNECTED => {
385          self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
386          // We want to guarantee if a message was not received that we get it
387          // back; since spsc::{Producer,Consumer} have the same
388          // internal representation (as a singleton struct containing Arc
389          // <Buffer <T>>), we can safely transmute the producer in order to
390          // pop the message back if it was orphaned.
391          unsafe {
392            let consumer : spsc::Consumer <T>
393              = std::mem::transmute (self.producer.get());
394            let first    = consumer.try_pop();
395            let second   = consumer.try_pop();
396            assert!(second.is_none());
397            if let Some(t) = first {
398              return Err (SendError (t))
399            }
400          }
401        },
402        n => {
403          assert! (0 <= n);
404        }
405      }
406      Ok (())
407    } else {
408      Err (SendError (t))
409    }
410  }
411} // end impl Sender
412
413impl <T> std::fmt::Debug for Sender <T> {
414  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
415    write!(f, "Sender {{ .. }}")
416  }
417}
418
419impl <T> Drop for Sender <T> {
420  fn drop (&mut self) {
421    self.inner.connected.store (false, Ordering::SeqCst);
422    match self.inner.counter.swap (DISCONNECTED, Ordering::SeqCst) {
423      DISCONNECTED => {}
424      -1 => {
425        self.inner.take_to_wake().signal();
426      }
427      n  => {
428        assert!(0 <= n);
429      }
430    }
431  }
432}
433
434impl Inner {
435  fn take_to_wake (&self) -> blocking::SignalToken {
436    let ptr = self.to_wake.swap (0, Ordering::SeqCst);
437    assert!(ptr != 0);
438    unsafe {
439      blocking::SignalToken::cast_from_usize (ptr)
440    }
441  }
442}
443
444impl <T> Iterator for Iter <'_, T> {
445  type Item = T;
446  fn next (&mut self) -> Option <T> {
447    self.rx.recv().ok()
448  }
449}
450
451impl <T> Iterator for TryIter <'_, T> {
452  type Item = T;
453  fn next (&mut self) -> Option <T> {
454    self.rx.try_recv().ok()
455  }
456}
457
458impl <T> Iterator for IntoIter <T> {
459  type Item = T;
460  fn next (&mut self) -> Option <T> {
461    self.rx.recv().ok()
462  }
463}
464
465impl std::fmt::Display for RecvError {
466  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
467    "receiving on a closed channel".fmt (f)
468  }
469}
470
471impl std::error::Error for RecvError {
472  fn description (&self) -> &'static str {
473    "receiving on a closed channel"
474  }
475
476  fn cause (&self) -> Option <&dyn std::error::Error> {
477    None
478  }
479}
480
481impl <T> std::fmt::Debug for SendError <T> {
482  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
483    "SendError(..)".fmt (f)
484  }
485}
486
487impl <T> std::fmt::Display for SendError <T> {
488  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
489    "sending on a closed channel".fmt (f)
490  }
491}
492
493impl <T : Send> std::error::Error for SendError <T> {
494  fn description (&self) -> &'static str {
495    "sending on a closed channel"
496  }
497
498  fn cause (&self) -> Option <&dyn std::error::Error> {
499    None
500  }
501}
502
503impl std::fmt::Display for TryRecvError {
504  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
505    match *self {
506      TryRecvError::Empty        => "receiving on an empty channel".fmt (f),
507      TryRecvError::Disconnected => "receiving on a closed channel".fmt (f)
508    }
509  }
510}
511
512impl std::error::Error for TryRecvError {
513  fn description (&self) -> &str {
514    match *self {
515      TryRecvError::Empty        => "receiving on an empty channel",
516      TryRecvError::Disconnected => "receiving on a closed channel"
517    }
518  }
519
520  fn cause (&self) -> Option <&dyn std::error::Error> {
521    None
522  }
523}
524
525pub fn channel <T : 'static> () -> (Sender <T>, Receiver <T>) {
526  let (producer, consumer) = spsc::make (INITIAL_CAPACITY);
527  let (send_new, receive_new) = std::sync::mpsc::channel();
528  let inner = std::sync::Arc::new (
529    Inner {
530      counter:   std::sync::atomic::AtomicIsize::new (0),
531      connected: std::sync::atomic::AtomicBool::new (true),
532      to_wake:   std::sync::atomic::AtomicUsize::new (0)
533    }
534  );
535  let sender    = Sender {
536    producer: std::cell::UnsafeCell::new (producer),
537    send_new,
538    inner: inner.clone()
539  };
540  let receiver  = Receiver {
541    consumer: std::cell::UnsafeCell::new (consumer),
542    receive_new,
543    steals: std::cell::UnsafeCell::new (0),
544    inner
545  };
546  (sender, receiver)
547}
548
549#[cfg(test)]
550mod tests {
551  use super::*;
552
553  pub fn stress_factor() -> usize {
554    match std::env::var ("RUST_TEST_STRESS") {
555      Ok  (val) => val.parse().unwrap(),
556      Err (..)  => 1,
557    }
558  }
559
560  #[test]
561  fn smoke() {
562    let (tx, rx) = channel::<i32>();
563    tx.send (1).unwrap();
564    assert_eq!(rx.recv().unwrap(), 1);
565  }
566
567  #[test]
568  fn drop_full() {
569    let (tx, _rx) = channel::<Box <isize>>();
570    tx.send(Box::new (1)).unwrap();
571  }
572
573  // FIXME: test failed on an unwrap
574  #[test]
575  fn smoke_threads() {
576    let (tx, rx) = channel::<i32>();
577    let _t = std::thread::spawn (move|| {
578      // FIXME: debug
579      println!("smoke threads sending...");
580      tx.send (1).unwrap();
581    });
582    // FIXME: debug
583    println!("smoke threads receiving...");
584    assert_eq!(rx.recv().unwrap(), 1);
585  }
586
587  #[test]
588  fn smoke_port_gone() {
589    let (tx, rx) = channel::<i32>();
590    drop (rx);
591    assert!(tx.send (1).is_err());
592  }
593
594  #[test]
595  fn smoke_shared_port_gone() {
596    let (tx, rx) = channel::<i32>();
597    drop (rx);
598    assert!(tx.send (1).is_err())
599  }
600
601  #[test]
602  fn port_gone_concurrent() {
603    let (tx, rx) = channel::<i32>();
604    let _t = std::thread::spawn (move|| {
605      rx.recv().unwrap();
606    });
607    while tx.send (1).is_ok() {}
608  }
609
610  #[test]
611  fn smoke_chan_gone() {
612    let (tx, rx) = channel::<i32>();
613    drop (tx);
614    rx.recv().unwrap_err();
615  }
616
617  #[test]
618  fn chan_gone_concurrent() {
619    let (tx, rx) = channel::<i32>();
620    let _t = std::thread::spawn (move|| {
621      tx.send (1).unwrap();
622      tx.send (1).unwrap();
623    });
624    while rx.recv().is_ok() {}
625  }
626
627  #[test]
628  fn stress() {
629    let (tx, rx) = channel::<i32>();
630    let t = std::thread::spawn (move|| {
631      for _ in 0..10000 { tx.send (1).unwrap(); }
632    });
633    for _ in 0..10000 {
634      assert_eq!(rx.recv().unwrap(), 1);
635    }
636    t.join().ok().unwrap();
637  }
638
639  #[test]
640  fn send_from_outside_runtime() {
641    let (tx1, rx1) = channel::<bool>();
642    let (tx2, rx2) = channel::<i32>();
643    let t1 = std::thread::spawn (move|| {
644      tx1.send (true).unwrap();
645      for _ in 0..40 {
646        assert_eq!(rx2.recv().unwrap(), 1);
647      }
648    });
649    rx1.recv().unwrap();
650    let t2 = std::thread::spawn (move|| {
651      for _ in 0..40 {
652        tx2.send (1).unwrap();
653      }
654    });
655    t1.join().ok().unwrap();
656    t2.join().ok().unwrap();
657  }
658
659  #[test]
660  fn recv_from_outside_runtime() {
661    let (tx, rx) = channel::<i32>();
662    let t = std::thread::spawn (move|| {
663      for _ in 0..40 {
664        assert_eq!(rx.recv().unwrap(), 1);
665      }
666    });
667    for _ in 0..40 {
668      tx.send (1).unwrap();
669    }
670    t.join().ok().unwrap();
671  }
672
673  #[test]
674  fn no_runtime() {
675    let (tx1, rx1) = channel::<i32>();
676    let (tx2, rx2) = channel::<i32>();
677    let t1 = std::thread::spawn (move|| {
678      assert_eq!(rx1.recv().unwrap(), 1);
679      tx2.send (2).unwrap();
680    });
681    let t2 = std::thread::spawn (move|| {
682      tx1.send (1).unwrap();
683      assert_eq!(rx2.recv().unwrap(), 2);
684    });
685    t1.join().ok().unwrap();
686    t2.join().ok().unwrap();
687  }
688
689  #[test]
690  fn oneshot_single_thread_close_port_first() {
691    // Simple test of closing without sending
692    let (_tx, rx) = channel::<i32>();
693    drop (rx);
694  }
695
696  #[test]
697  fn oneshot_single_thread_close_chan_first() {
698    // Simple test of closing without sending
699    let (tx, _rx) = channel::<i32>();
700    drop (tx);
701  }
702
703  #[test]
704  fn oneshot_single_thread_send_port_close() {
705    // Testing that the sender cleans up the payload if receiver is closed
706    let (tx, rx) = channel::<Box <i32>>();
707    drop (rx);
708    assert!(tx.send (Box::new (0)).is_err());
709  }
710
711  #[test]
712  fn oneshot_single_thread_recv_chan_close() {
713    // Receiving on a closed chan will panic
714    let res = std::thread::spawn (move|| {
715      let (tx, rx) = channel::<i32>();
716      drop (tx);
717      rx.recv().unwrap();
718    }).join();
719    // What is our res?
720    assert!(res.is_err());
721  }
722
723  #[test]
724  fn oneshot_single_thread_send_then_recv() {
725    let (tx, rx) = channel::<Box <i32>>();
726    tx.send (Box::new (10)).unwrap();
727    assert!(*rx.recv().unwrap() == 10);
728  }
729
730  #[test]
731  fn oneshot_single_thread_try_send_open() {
732    let (tx, rx) = channel::<i32>();
733    tx.send (10).unwrap();
734    assert!(rx.recv().unwrap() == 10);
735  }
736
737  #[test]
738  fn oneshot_single_thread_try_send_closed() {
739    let (tx, rx) = channel::<i32>();
740    drop (rx);
741    assert!(tx.send (10).is_err());
742  }
743
744  #[test]
745  fn oneshot_single_thread_try_recv_open() {
746    let (tx, rx) = channel::<i32>();
747    tx.send (10).unwrap();
748    assert!(rx.recv() == Ok (10));
749  }
750
751  #[test]
752  fn oneshot_single_thread_try_recv_closed() {
753    let (tx, rx) = channel::<i32>();
754    drop (tx);
755    rx.recv().unwrap_err();
756  }
757
758  #[test]
759  fn oneshot_single_thread_peek_data() {
760    let (tx, rx) = channel::<i32>();
761    assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
762    tx.send (10).unwrap();
763    assert_eq!(rx.try_recv(), Ok (10));
764  }
765
766  #[test]
767  fn oneshot_single_thread_peek_close() {
768    let (tx, rx) = channel::<i32>();
769    drop (tx);
770    assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
771    assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
772  }
773
774  #[test]
775  fn oneshot_single_thread_peek_open() {
776    let (_tx, rx) = channel::<i32>();
777    assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
778  }
779
780  #[test]
781  fn oneshot_multi_task_recv_then_send () {
782    let (tx, rx) = channel::<Box <i32>>();
783    let _t = std::thread::spawn (move|| {
784      assert!(*rx.recv().unwrap() == 10);
785    });
786
787    tx.send (Box::new (10)).unwrap();
788  }
789
790  #[test]
791  fn oneshot_multi_task_recv_then_close() {
792    let (tx, rx) = channel::<Box <i32>>();
793    let _t = std::thread::spawn (move|| {
794      drop (tx);
795    });
796    let res = std::thread::spawn (move|| {
797      assert!(*rx.recv().unwrap() == 10);
798    }).join();
799    assert!(res.is_err());
800  }
801
802  #[test]
803  fn oneshot_multi_thread_close_stress() {
804    for _ in 0..stress_factor() {
805      let (tx, rx) = channel::<i32>();
806      let _t = std::thread::spawn (move|| {
807        drop (rx);
808      });
809      drop (tx);
810    }
811  }
812
813  #[test]
814  fn oneshot_multi_thread_send_close_stress() {
815    for _ in 0..stress_factor() {
816      let (tx, rx) = channel::<i32>();
817      let _t = std::thread::spawn (move|| {
818        drop (rx);
819      });
820      let _ = std::thread::spawn (move|| {
821        tx.send (1).unwrap();
822      }).join();
823    }
824  }
825
826  #[test]
827  fn oneshot_multi_thread_recv_close_stress() {
828    for _ in 0..stress_factor() {
829      let (tx, rx) = channel::<i32>();
830      std::thread::spawn (move|| {
831        let res = std::thread::spawn (move|| {
832          rx.recv().unwrap();
833        }).join();
834        assert!(res.is_err());
835      });
836      let _t = std::thread::spawn (move|| {
837        std::thread::spawn (move|| {
838          drop (tx);
839        });
840      });
841    }
842  }
843
844  #[test]
845  fn oneshot_multi_thread_send_recv_stress() {
846    for _ in 0..stress_factor() {
847      let (tx, rx) = channel::<Box <isize>>();
848      let _t = std::thread::spawn (move|| {
849        tx.send (Box::new (10)).unwrap();
850      });
851      assert!(*rx.recv().unwrap() == 10);
852    }
853  }
854
855  #[test]
856  fn stream_send_recv_stress() {
857    for _ in 0..stress_factor() {
858      let (tx, rx) = channel();
859
860      send (tx, 0);
861      recv (rx, 0);
862
863      fn send (tx: Sender<Box <i32>>, i: i32) {
864        if i == 10 { return }
865
866        std::thread::spawn (move|| {
867          tx.send (Box::new (i)).unwrap();
868          send (tx, i + 1);
869        });
870      }
871
872      fn recv (rx: Receiver<Box <i32>>, i: i32) {
873        if i == 10 { return }
874
875        std::thread::spawn (move|| {
876          assert!(*rx.recv().unwrap() == i);
877          recv (rx, i + 1);
878        });
879      }
880    }
881  }
882
883  #[test]
884  fn oneshot_single_thread_recv_timeout() {
885    let (tx, rx) = channel();
886    tx.send (true).unwrap();
887    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
888    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)),
889      Err (RecvTimeoutError::Timeout));
890    tx.send (true).unwrap();
891    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
892  }
893
894  #[test]
895  fn stress_recv_timeout_two_threads() {
896    let (tx, rx) = channel();
897    let stress = stress_factor() + 100;
898    let timeout = std::time::Duration::from_millis (100);
899
900    std::thread::spawn (move || {
901      for i in 0..stress {
902        if i % 2 == 0 {
903          std::thread::sleep (timeout * 2);
904        }
905        tx.send (1usize).unwrap();
906      }
907    });
908
909    let mut recv_count = 0;
910    loop {
911      match rx.recv_timeout (timeout) {
912        Ok (n) => {
913          assert_eq!(n, 1usize);
914          recv_count += 1;
915        }
916        Err (RecvTimeoutError::Timeout) => { }
917        Err (RecvTimeoutError::Disconnected) => break
918      }
919    }
920
921    assert_eq!(recv_count, stress);
922  }
923
924  #[test]
925  fn recv_a_lot() {
926    // Regression test that we don't run out of stack in scheduler context
927    let (tx, rx) = channel();
928    for _ in 0..10000 { tx.send (true).unwrap(); }
929    for _ in 0..10000 { rx.recv().unwrap(); }
930  }
931
932  #[test]
933  fn nested_recv_iter() {
934    let (tx, rx) = channel::<i32>();
935    let (total_tx, total_rx) = channel::<i32>();
936
937    let _t = std::thread::spawn (move|| {
938      let mut acc = 0;
939      for x in rx.iter() {
940        acc += x;
941      }
942      total_tx.send (acc).unwrap();
943    });
944
945    tx.send (3).unwrap();
946    tx.send (1).unwrap();
947    tx.send (2).unwrap();
948    drop (tx);
949    assert_eq!(total_rx.recv().unwrap(), 6);
950  }
951
952  #[test]
953  fn recv_iter_break() {
954    let (tx, rx) = channel::<i32>();
955    let (count_tx, count_rx) = channel();
956
957    let _t = std::thread::spawn (move|| {
958      let mut count = 0;
959      for x in rx.iter() {
960        if count >= 3 {
961          break;
962        } else {
963          count += x;
964        }
965      }
966      count_tx.send (count).unwrap();
967    });
968
969    tx.send (2).unwrap();
970    tx.send (2).unwrap();
971    tx.send (2).unwrap();
972    let _ = tx.send (2);
973    drop (tx);
974    assert_eq!(count_rx.recv().unwrap(), 4);
975  }
976
977  // FIXME: failures
978  // - failed with assertion on line 394 in send fn
979  //   assert!(second.is_none())
980  // - failed to finish in less than 60 seconds
981  #[test]
982  fn recv_try_iter() {
983    let (request_tx, request_rx) = channel();
984    let (response_tx, response_rx) = channel();
985
986    // Request `x`s until we have `6`.
987    let t = std::thread::spawn (move|| {
988      let mut count = 0;
989      loop {
990        for x in response_rx.try_iter() {
991          count += x;
992          if count == 6 {
993            return count;
994          }
995        }
996        // FIXME: debug
997        println!("test recv try iter send request...");
998        request_tx.send (true).unwrap();
999      }
1000    });
1001
1002    for _ in request_rx.iter() {
1003      // FIXME: debug
1004      println!("test recv try iter send response...");
1005      if response_tx.send (2).is_err() {
1006        break;
1007      }
1008    }
1009
1010    // FIXME: debug
1011    println!("test recv try iter join...");
1012
1013    assert_eq!(t.join().unwrap(), 6);
1014  }
1015
1016  #[test]
1017  fn recv_into_iter_owned() {
1018    let mut iter = {
1019      let (tx, rx) = channel::<i32>();
1020      tx.send (1).unwrap();
1021      tx.send (2).unwrap();
1022
1023      rx.into_iter()
1024    };
1025    assert_eq!(iter.next().unwrap(), 1);
1026    assert_eq!(iter.next().unwrap(), 2);
1027    assert!(iter.next().is_none());
1028  }
1029
1030  #[test]
1031  fn recv_into_iter_borrowed() {
1032    let (tx, rx) = channel::<i32>();
1033    tx.send (1).unwrap();
1034    tx.send (2).unwrap();
1035    drop (tx);
1036    let mut iter = (&rx).into_iter();
1037    assert_eq!(iter.next().unwrap(), 1);
1038    assert_eq!(iter.next().unwrap(), 2);
1039    assert!(iter.next().is_none());
1040  }
1041
1042  // FIXME: test failed unwrap on RecvError
1043  #[test]
1044  fn try_recv_states() {
1045    let (tx1, rx1) = channel::<i32>();
1046    let (tx2, rx2) = channel::<bool>();
1047    let (tx3, rx3) = channel::<bool>();
1048    let _t = std::thread::spawn (move|| {
1049      rx2.recv().unwrap();
1050      tx1.send (1).unwrap();
1051      tx3.send (true).unwrap();
1052      rx2.recv().unwrap();
1053      drop (tx1);
1054      tx3.send (true).unwrap();
1055    });
1056
1057    assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1058    tx2.send (true).unwrap();
1059    rx3.recv().unwrap();
1060    assert_eq!(rx1.try_recv(), Ok (1));
1061    assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1062    tx2.send (true).unwrap();
1063    rx3.recv().unwrap();
1064    assert_eq!(rx1.try_recv(), Err (TryRecvError::Disconnected));
1065  }
1066
1067  #[test]
1068  fn issue_32114() {
1069    let (tx, _) = channel();
1070    let _ = tx.send (123);
1071    assert_eq!(tx.send (123), Err (SendError (123)));
1072  }
1073
1074  #[test]
1075  fn zero_size() {
1076    let (tx, rx) = channel::<()>();
1077    tx.send (()).unwrap();
1078    let () = rx.recv().unwrap();
1079  }
1080}