Skip to main content

unbounded_spsc/
lib.rs

1//! This library adapts the block-waiting `recv` mechanism from the Rust standard
2//! library to an unbounded SPSC channel backed by `spsc`.
3//!
4//! # Delivery guarantee
5//!
6//! A successful [`Sender::send`] (`Ok(())`) does not guarantee that the corresponding
7//! [`Receiver::recv`] will observe the message. If the receiver is dropped concurrently
8//! with a send, the in-flight value may be silently discarded (it is destructed
9//! properly when the underlying queue is dropped; it is not leaked). This is a
10//! deliberate weakening relative to the std-library stream-flavor channel from which
11//! this design was originally adapted.
12
13#![feature(negative_impls)]
14
15use bounded_spsc_queue as spsc;
16
17use std::sync::atomic::Ordering;
18
19mod blocking;
20mod select;
21
22const DISCONNECTED     : isize = isize::MIN;
23#[cfg(test)]
24const MAX_STEALS       : isize = 5;
25#[cfg(not(test))]
26const MAX_STEALS       : isize = 1 << 20;   // ~1 million
27const INITIAL_CAPACITY : usize = 128;
28
29pub struct Receiver <T> {
30  consumer    : std::cell::UnsafeCell <spsc::Consumer <T>>,
31  receive_new : std::sync::mpsc::Receiver <spsc::Consumer <T>>,
32  inner       : std::sync::Arc <Inner>,
33  steals      : std::cell::UnsafeCell <isize>
34}
35
36pub struct Sender <T> {
37  producer : std::cell::UnsafeCell <spsc::Producer <T>>,
38  send_new : std::sync::mpsc::Sender <spsc::Consumer <T>>,
39  inner    : std::sync::Arc <Inner>
40}
41
42struct Inner {
43  counter   : std::sync::atomic::AtomicIsize,
44  connected : std::sync::atomic::AtomicBool,
45  to_wake   : std::sync::atomic::AtomicPtr <blocking::Inner>
46}
47
48#[derive(Debug)]
49pub struct Iter <'a, T > {
50  rx : &'a Receiver <T>
51}
52
53#[derive(Debug)]
54pub struct TryIter <'a, T > {
55  rx : &'a Receiver <T>
56}
57
58#[derive(Debug)]
59pub struct IntoIter <T> {
60  rx : Receiver <T>
61}
62
63/// Sender disconnected, no further messages will ever be received.
64#[derive(Clone,Copy,Debug,Eq,PartialEq)]
65pub struct RecvError;
66
67/// Receiver disconnected, message will never be deliverable.
68#[derive(Clone,Copy,Eq,PartialEq)]
69pub struct SendError <T> (pub T);
70
71#[derive(Clone,Copy,Debug,Eq,PartialEq)]
72pub enum TryRecvError {
73  Empty,
74  Disconnected
75}
76
77#[derive(Clone,Copy,Debug,Eq,PartialEq)]
78pub enum RecvTimeoutError {
79  Timeout,
80  Disconnected
81}
82
83pub enum SelectionResult {
84  SelSuccess,
85  SelCanceled
86}
87
88impl <T> Receiver <T> {
89  /// Non-blocking receive, returns `Err(TryRecvError::Empty)` if buffer was empty; will
90  /// continue to receive pending messages from a disconnected channel until it is
91  /// empty, at which point further calls to this function will return
92  /// `Err(TryRecvError::Disconnected)`.
93  #[expect(clippy::missing_panics_doc)]
94  pub fn try_recv (&self) -> Result <T, TryRecvError> {
95    match unsafe { (*self.consumer.get()).try_pop() } {
96      Some (t) => unsafe {
97        if MAX_STEALS < *self.steals.get() {
98          match self.inner.counter.swap (0, Ordering::SeqCst) {
99            DISCONNECTED => {
100              self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
101            }
102            n => {
103              let m = std::cmp::min (n, *self.steals.get());
104              *self.steals.get() -= m;
105              self.bump (n - m);
106            }
107          }
108          // TODO: can this be changed to a debug assertion ?
109          assert!(0 <= *self.steals.get());
110        }
111        *self.steals.get() += 1;
112        Ok (t)
113      },
114      None => {
115        match self.receive_new.try_recv() {
116          Ok (new_consumer) => {
117            unsafe { *self.consumer.get() = new_consumer; }
118            self.try_recv()
119          },
120          Err (std::sync::mpsc::TryRecvError::Empty) => {
121            match self.inner.counter.load (Ordering::SeqCst) {
122              n if n != DISCONNECTED => Err (TryRecvError::Empty),
123              _ => {
124                match unsafe { (*self.consumer.get()).try_pop() } {
125                  Some (t) => Ok (t),
126                  None     => Err (TryRecvError::Disconnected)
127                }
128              }
129            }
130          },
131          Err (std::sync::mpsc::TryRecvError::Disconnected) => {
132            Err (TryRecvError::Disconnected)
133          }
134        }
135      }
136    }
137  }
138
139  /// Block waiting if no messages are pending in the buffer.
140  pub fn recv (&self) -> Result <T, RecvError> {
141    match self.try_recv() {
142      Err (TryRecvError::Empty) => {}
143      Err (TryRecvError::Disconnected) => return Err (RecvError),
144      Ok  (t) => return Ok (t)
145    }
146    let (wait_token, signal_token) = blocking::tokens();
147    if self.decrement (signal_token).is_ok() {
148      wait_token.wait();
149    }
150    match self.try_recv() {
151      Ok (t) => unsafe {
152        *self.steals.get() -= 1;
153        Ok (t)
154      },
155      Err (TryRecvError::Empty) => unreachable!(
156        "woken thread should have found pending message"),
157      Err (TryRecvError::Disconnected) => Err (RecvError)
158    }
159  }
160
161  pub fn recv_timeout (&self, timeout : std::time::Duration)
162    -> Result <T, RecvTimeoutError>
163  {
164    match self.try_recv() {
165      Ok  (t)                          => Ok (t),
166      Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected),
167      Err (TryRecvError::Empty)
168        => self.recv_max_until (std::time::Instant::now() + timeout)
169    }
170  }
171
172  #[expect(mismatched_lifetime_syntaxes)]
173  pub const fn iter (&self) -> Iter <T> {
174    Iter {
175      rx: self
176    }
177  }
178
179  #[expect(mismatched_lifetime_syntaxes)]
180  pub const fn try_iter (&self) -> TryIter <T> {
181    TryIter {
182      rx: self
183    }
184  }
185
186  pub fn capacity (&self) -> usize {
187    unsafe {
188      (*self.consumer.get()).capacity()
189    }
190  }
191
192  fn recv_max_until (&self, deadline : std::time::Instant)
193    -> Result <T, RecvTimeoutError>
194  {
195    loop {
196      match self.recv_deadline (deadline) {
197        result @ Err (RecvTimeoutError::Timeout) => {
198          if deadline <= std::time::Instant::now() {
199            return result
200          }
201        },
202        result => return result
203      }
204    }
205  }
206
207  /// This is the same as `recv` except with code for timeout.
208  fn recv_deadline (&self, deadline : std::time::Instant)
209    -> Result <T, RecvTimeoutError>
210  {
211    match self.try_recv() {
212      Err (TryRecvError::Empty) => {}
213      Err (TryRecvError::Disconnected)
214        => return Err (RecvTimeoutError::Disconnected),
215      Ok  (t) => return Ok (t)
216    }
217    let (wait_token, signal_token) = blocking::tokens();
218    if self.decrement (signal_token).is_ok() {
219      let timed_out = !wait_token.wait_max_until (deadline);
220      if timed_out {
221        // this boolean result is not used: `try_recv` is always called below
222        let _has_data = self.abort_selection_();
223      }
224    }
225    match self.try_recv() {
226      Ok (t) => unsafe {
227        *self.steals.get() -= 1;
228        Ok (t)
229      }
230      Err (TryRecvError::Empty)        => Err (RecvTimeoutError::Timeout),
231      Err (TryRecvError::Disconnected) => Err (RecvTimeoutError::Disconnected)
232    }
233  }
234
235  fn decrement (&self, token : std::sync::Arc <blocking::Inner>)
236    -> Result <(), std::sync::Arc <blocking::Inner>>
237  {
238    assert_eq!(self.inner.to_wake.load (Ordering::SeqCst), std::ptr::null_mut());
239    // Consume `token` and stash its raw pointer in `to_wake`, leaking one strong
240    // reference into the AtomicPtr. The leaked ref is reclaimed either by
241    // `Inner::take_to_wake` (when a sender wakes us) or by `Arc::from_raw` below (when
242    // we cancel before blocking).
243    //
244    // NOTE: We must use `Arc::into_raw` (not `Arc::as_ptr`) here. Using `as_ptr` would
245    // leave the strong-count unchanged; `token` would then be dropped at function exit,
246    // releasing the ref, while the dangling-looking pointer remained in `to_wake`. A
247    // subsequent `take_to_wake` -> `Arc::from_raw` would over-decrement the strong
248    // count and free the allocation while another `Arc` still pointed at it -- a
249    // use-after-free / double-free that manifests as `tcache_thread_shutdown: unaligned
250    // tcache chunk detected` under glibc.
251    let ptr = std::sync::Arc::into_raw (token).cast_mut();
252    self.inner.to_wake.store (ptr, Ordering::SeqCst);
253    let steals = unsafe { std::ptr::replace (self.steals.get(), 0) };
254    match self.inner.counter.fetch_sub (1 + steals, Ordering::SeqCst) {
255      DISCONNECTED => {
256        self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
257      }
258      n => {
259        assert!(0 <= n);
260        if n - steals <= 0 {
261          // Sender will reclaim the leaked ref via `take_to_wake`.
262          return Ok (())
263        }
264      }
265    }
266    self.inner.to_wake.store (std::ptr::null_mut(), Ordering::SeqCst);
267    // Cancelled before blocking: no sender will read `to_wake`, so we reclaim the
268    // leaked strong reference ourselves and return it. SAFETY: `ptr` was produced by
269    // `Arc::into_raw` above and no other code path observed it (we just swapped null
270    // back into `to_wake`).
271    Err (unsafe { std::sync::Arc::from_raw (ptr) })
272  }
273
274  /////////////////////////////////////////////////////////////////////////////
275  //  select functions
276  /////////////////////////////////////////////////////////////////////////////
277
278  fn can_recv_ (&self) -> bool {
279    0 < unsafe { (*self.consumer.get()).size() }
280  }
281
282  fn start_selection_ (&self, token : std::sync::Arc <blocking::Inner>)
283    -> SelectionResult
284  {
285    match self.decrement (token) {
286      Ok  (()) => SelectionResult::SelSuccess,
287      Err (_token) => {
288        // undo decrement above
289        let prev = self.bump (1);
290        assert!(prev == DISCONNECTED || 0 <= prev);
291        SelectionResult::SelCanceled
292      }
293    }
294  }
295
296  /// Returns true if receiver has data pending.
297  fn abort_selection_ (&self) -> bool {
298    let steals = 1;
299    let prev = self.bump (steals + 1);
300    if prev == DISCONNECTED {
301      assert_eq!(self.inner.to_wake.load (Ordering::SeqCst),
302        std::ptr::null_mut());
303      true
304    } else {
305      let cur = prev + steals + 1;
306      assert!(0 <= cur);
307      if prev < 0 {
308        drop (self.inner.take_to_wake());
309      } else {
310        while !self.inner.to_wake.load (Ordering::SeqCst).is_null() {
311          std::thread::yield_now();
312        }
313      }
314      unsafe {
315        assert_eq!(*self.steals.get(), 0);
316        *self.steals.get() = steals;
317      }
318      0 <= prev
319    }
320  }
321
322  fn bump (&self, amt : isize) -> isize {
323    match self.inner.counter.fetch_add (amt, Ordering::SeqCst) {
324      DISCONNECTED => {
325        self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
326        DISCONNECTED
327      }
328      n => n
329    }
330  }
331
332} // end impl Receiver
333
334impl <T> std::fmt::Debug for Receiver <T> {
335  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
336    write!(f, "Receiver {{ .. }}")
337  }
338}
339
340impl <T> IntoIterator for Receiver <T> {
341  type Item = T;
342  type IntoIter = IntoIter <T>;
343  fn into_iter (self) -> IntoIter <T> {
344    IntoIter {
345      rx: self
346    }
347  }
348}
349
350impl <'a, T> IntoIterator for &'a Receiver <T> {
351  type Item = T;
352  type IntoIter = Iter <'a, T>;
353  fn into_iter (self) -> Iter <'a, T> {
354    self.iter()
355  }
356}
357
358impl <T> Drop for Receiver <T> {
359  fn drop (&mut self) {
360    // Gate future sends: once `connected` is observed `false`, `Sender::send` bails out
361    // before pushing, so after this point at most ONE in-flight `send` (whose
362    // `connected.load` already returned `true`) may still commit, after which no more
363    // pushes ever happen. The drain loop below relies on this bound.
364    self.inner.connected.store (false, Ordering::SeqCst);
365
366    // Drain remaining messages so that:
367    //   1. their destructors run *now*, while the receiver's `T` type is still in scope
368    //      (the bounded queues' `Buffer::drop` will of course also drop them, but only
369    //      when the last `Arc<Buffer<T>>` is released, which can be later); and
370    //   2. the channel `counter` can be reconciled with our local `steals` and CAS'd to
371    //      `DISCONNECTED`, which is what informs senders that the orphan path applies
372    //      (see `Sender::send`).
373    //
374    // Because this crate grows the channel by allocating a fresh `bounded-spsc-queue`
375    // and publishing the new `Consumer` via the `send_new` / `receive_new`
376    // `std::sync::mpsc` side channel, draining requires walking that side channel too:
377    // items pushed after a resize live in queues whose `Consumer` is still buffered in
378    // `receive_new`, not in `*self.consumer.get()`. Without this side-channel walk, the
379    // CAS below never observes `counter == steals` and the loop spins forever (this
380    // manifested as a hang in `tests::recv_try_iter` when run alongside other tests).
381    let mut steals = unsafe { *self.steals.get() };
382    // Bound on consecutive `yield_now`s with no observable progress. If the sender is
383    // parked on an unrelated channel rather than mid-send, the counter will never
384    // converge on `steals` here, but the orphan items (if any) are still safe --
385    // `Buffer::drop` will destruct them when the bounded queues' `Arc<Buffer<T>>`
386    // refcounts hit zero.
387    const MAX_IDLE_YIELDS : u32 = 32;
388    let mut idle_yields = 0u32;
389    while {
390      let count = self.inner.counter.compare_exchange (
391        steals, DISCONNECTED, Ordering::SeqCst, Ordering::SeqCst
392      ).unwrap_or_else (|i| i);
393      count != DISCONNECTED && count != steals
394    } {
395      // Drain the current consumer.
396      let mut drained_here = 0;
397      while let Some (_t) = unsafe { (*self.consumer.get()).try_pop() } {
398        steals += 1;
399        drained_here += 1;
400      }
401      // Try to pick up the next post-resize consumer from the side channel.
402      match self.receive_new.try_recv() {
403        Ok (new_consumer) => unsafe {
404          *self.consumer.get() = new_consumer;
405          idle_yields = 0;
406        }
407        Err (std::sync::mpsc::TryRecvError::Empty) => {
408          if drained_here != 0 {
409            idle_yields = 0;
410          } else {
411            idle_yields += 1;
412            if idle_yields >= MAX_IDLE_YIELDS {
413              // Give up: store DISCONNECTED unconditionally. The CAS in the loop
414              // condition would otherwise spin forever (sender parked on another
415              // channel, no further progress observable here).
416              self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
417              break;
418            }
419            std::thread::yield_now();
420          }
421        }
422        Err (std::sync::mpsc::TryRecvError::Disconnected) => {
423          // Sender is gone entirely; no further new consumers will arrive. Force
424          // DISCONNECTED state and let `Buffer::drop` clean up the rest.
425          self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
426          break;
427        }
428      }
429    }
430  }
431}
432
433impl <T> Sender <T> {
434  /// Non-blocking send.
435  ///
436  /// # Delivery guarantee
437  ///
438  /// Returning `Ok(())` indicates that the message was successfully enqueued, but it
439  /// does not guarantee that the receiver will observe it: if the [`Receiver`] is
440  /// dropped concurrently with this call, the message may be silently discarded. The
441  /// orphaned value is destructed properly when the underlying queue is dropped; it is
442  /// not leaked.
443  //
444  // This is a deliberate weakening relative to the std-library stream-flavor channel
445  // from which this design was originally adapted. The std version returned the orphan
446  // to the caller via `Err(SendError(t))`, but the underlying `bounded-spsc-queue` does
447  // not expose a safe way to recover an already-pushed value from the producer side,
448  // and the previous implementation that did so via `std::mem::transmute` was unsound
449  // (see issues #1 and #4).
450  #[expect(clippy::missing_panics_doc)]
451  pub fn send (&self, t : T) -> Result <(), SendError <T>> {
452    if self.inner.connected.load (Ordering::SeqCst) {
453      match unsafe { (*self.producer.get()).try_push (t) } {
454        None     => {}, // success
455        Some (t) => {   // queue full
456          let new_capacity = 2 * unsafe { (*self.producer.get()).capacity() };
457          let (new_producer, new_consumer) = spsc::make (new_capacity);
458          if self.send_new.send (new_consumer).is_err() {
459            // Receiver may have dropped since `connected.load()`, treat as disconnected
460            return Err (SendError (t));
461          }
462          unsafe { *self.producer.get() = new_producer; }
463          match unsafe { (*self.producer.get()).try_push (t) } {
464            None      => {}
465            Some (_t) => unreachable!(
466              "send on a newly created queue should always succeed")
467          }
468        }
469      }
470      // TODO: can we replace asserts with debug assertions ?
471      match self.inner.counter.fetch_add (1, Ordering::SeqCst) {
472        -1 => {
473          self.inner.take_to_wake().signal();
474        },
475        -2 => {},
476        DISCONNECTED => {
477          self.inner.counter.store (DISCONNECTED, Ordering::SeqCst);
478          // The receiver disconnected after we successfully pushed the message onto the
479          // bounded queue. The value is now orphaned, but it is *not* leaked: it will
480          // be properly dropped either by the receiver's drain loop in `<Receiver<T> as
481          // Drop>::drop`, or by `bounded_spsc_queue::Buffer::<T>::drop`, which pops and
482          // drops every remaining element when the last `Arc<Buffer<T>>` reference is
483          // released.
484          //
485          // Historical note: the previous implementation here transmuted the producer
486          // pointer into a `Consumer<T>` value in order to pop the orphan back and
487          // return it via `Err(SendError(t))`. That transmute was unsound (issues #1
488          // and #4): the transmuted "Consumer"'s internal `Arc<Buffer<T>>` field
489          // pointed into the `Sender` stack frame, not a real heap allocation, so
490          // `try_pop` performed an out-of-bounds read and the fake `Arc`'s `Drop` would
491          // call `dealloc` on a non-allocated address. Recovering the value safely is
492          // not possible without an upstream change to `bounded-spsc-queue` to expose a
493          // producer-side pop or a `Consumer<T>` recovery primitive. See the `send`
494          // doc-comment for the resulting (weakened) delivery guarantee.
495        },
496        n => {
497          assert! (0 <= n);
498        }
499      }
500      Ok (())
501    } else {
502      Err (SendError (t))
503    }
504  }
505} // end impl Sender
506
507impl <T> std::fmt::Debug for Sender <T> {
508  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
509    write!(f, "Sender {{ .. }}")
510  }
511}
512
513impl <T> Drop for Sender <T> {
514  fn drop (&mut self) {
515    self.inner.connected.store (false, Ordering::SeqCst);
516    match self.inner.counter.swap (DISCONNECTED, Ordering::SeqCst) {
517      DISCONNECTED => {}
518      -1 => {
519        self.inner.take_to_wake().signal();
520      }
521      n  => {
522        assert!(0 <= n);
523      }
524    }
525  }
526}
527
528impl Inner {
529  fn take_to_wake (&self) -> std::sync::Arc <blocking::Inner> {
530    let ptr = self.to_wake.swap (std::ptr::null_mut(), Ordering::SeqCst);
531    assert!(!ptr.is_null());
532    unsafe {
533      std::sync::Arc::from_raw (ptr)
534    }
535  }
536}
537
538impl <T> Iterator for Iter <'_, T> {
539  type Item = T;
540  fn next (&mut self) -> Option <T> {
541    self.rx.recv().ok()
542  }
543}
544
545impl <T> Iterator for TryIter <'_, T> {
546  type Item = T;
547  fn next (&mut self) -> Option <T> {
548    self.rx.try_recv().ok()
549  }
550}
551
552impl <T> Iterator for IntoIter <T> {
553  type Item = T;
554  fn next (&mut self) -> Option <T> {
555    self.rx.recv().ok()
556  }
557}
558
559impl std::fmt::Display for RecvError {
560  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
561    "receiving on a closed channel".fmt (f)
562  }
563}
564
565impl std::error::Error for RecvError {
566  fn description (&self) -> &'static str {
567    "receiving on a closed channel"
568  }
569
570  fn cause (&self) -> Option <&dyn std::error::Error> {
571    None
572  }
573}
574
575impl <T> std::fmt::Debug for SendError <T> {
576  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
577    "SendError(..)".fmt (f)
578  }
579}
580
581impl <T> std::fmt::Display for SendError <T> {
582  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
583    "sending on a closed channel".fmt (f)
584  }
585}
586
587impl <T : Send> std::error::Error for SendError <T> {
588  fn description (&self) -> &'static str {
589    "sending on a closed channel"
590  }
591
592  fn cause (&self) -> Option <&dyn std::error::Error> {
593    None
594  }
595}
596
597impl std::fmt::Display for TryRecvError {
598  fn fmt (&self, f : &mut std::fmt::Formatter) -> std::fmt::Result {
599    match *self {
600      TryRecvError::Empty        => "receiving on an empty channel".fmt (f),
601      TryRecvError::Disconnected => "receiving on a closed channel".fmt (f)
602    }
603  }
604}
605
606impl std::error::Error for TryRecvError {
607  fn description (&self) -> &str {
608    match *self {
609      TryRecvError::Empty        => "receiving on an empty channel",
610      TryRecvError::Disconnected => "receiving on a closed channel"
611    }
612  }
613
614  fn cause (&self) -> Option <&dyn std::error::Error> {
615    None
616  }
617}
618
619pub fn channel <T : 'static> () -> (Sender <T>, Receiver <T>) {
620  let (producer, consumer) = spsc::make (INITIAL_CAPACITY);
621  let (send_new, receive_new) = std::sync::mpsc::channel();
622  let inner = std::sync::Arc::new (
623    Inner {
624      counter:   std::sync::atomic::AtomicIsize::new (0),
625      connected: std::sync::atomic::AtomicBool::new (true),
626      to_wake:   std::sync::atomic::AtomicPtr::new (std::ptr::null_mut())
627    }
628  );
629  let sender    = Sender {
630    producer: std::cell::UnsafeCell::new (producer),
631    send_new,
632    inner: inner.clone()
633  };
634  let receiver  = Receiver {
635    consumer: std::cell::UnsafeCell::new (consumer),
636    receive_new,
637    steals: std::cell::UnsafeCell::new (0),
638    inner
639  };
640  (sender, receiver)
641}
642
643#[cfg(test)]
644mod tests {
645  use super::*;
646
647  pub(crate) fn stress_factor() -> usize {
648    match std::env::var ("RUST_TEST_STRESS") {
649      Ok  (val) => val.parse().unwrap(),
650      Err (..)  => 1,
651    }
652  }
653
654  #[test]
655  fn smoke() {
656    let (tx, rx) = channel::<i32>();
657    tx.send (1).unwrap();
658    assert_eq!(rx.recv().unwrap(), 1);
659  }
660
661  #[test]
662  fn drop_full() {
663    let (tx, _rx) = channel::<Box <isize>>();
664    tx.send(Box::new (1)).unwrap();
665  }
666
667  // TODO: test failed on an unwrap
668  #[test]
669  fn smoke_threads() {
670    let (tx, rx) = channel::<i32>();
671    let _t = std::thread::spawn (move|| {
672      println!("smoke threads sending...");
673      tx.send (1).unwrap();
674    });
675    println!("smoke threads receiving...");
676    assert_eq!(rx.recv().unwrap(), 1);
677  }
678
679  #[test]
680  fn smoke_port_gone() {
681    let (tx, rx) = channel::<i32>();
682    drop (rx);
683    assert!(tx.send (1).is_err());
684  }
685
686  #[test]
687  fn smoke_shared_port_gone() {
688    let (tx, rx) = channel::<i32>();
689    drop (rx);
690    assert!(tx.send (1).is_err())
691  }
692
693  #[test]
694  fn port_gone_concurrent() {
695    let (tx, rx) = channel::<i32>();
696    let _t = std::thread::spawn (move|| {
697      rx.recv().unwrap();
698    });
699    while tx.send (1).is_ok() {}
700  }
701
702  #[test]
703  fn smoke_chan_gone() {
704    let (tx, rx) = channel::<i32>();
705    drop (tx);
706    rx.recv().unwrap_err();
707  }
708
709  #[test]
710  fn chan_gone_concurrent() {
711    let (tx, rx) = channel::<i32>();
712    let _t = std::thread::spawn (move|| {
713      tx.send (1).unwrap();
714      tx.send (1).unwrap();
715    });
716    while rx.recv().is_ok() {}
717  }
718
719  #[test]
720  fn stress() {
721    let (tx, rx) = channel::<i32>();
722    let t = std::thread::spawn (move|| {
723      for _ in 0..10000 { tx.send (1).unwrap(); }
724    });
725    for _ in 0..10000 {
726      assert_eq!(rx.recv().unwrap(), 1);
727    }
728    t.join().ok().unwrap();
729  }
730
731  #[test]
732  fn send_from_outside_runtime() {
733    let (tx1, rx1) = channel::<bool>();
734    let (tx2, rx2) = channel::<i32>();
735    let t1 = std::thread::spawn (move|| {
736      tx1.send (true).unwrap();
737      for _ in 0..40 {
738        assert_eq!(rx2.recv().unwrap(), 1);
739      }
740    });
741    rx1.recv().unwrap();
742    let t2 = std::thread::spawn (move|| {
743      for _ in 0..40 {
744        tx2.send (1).unwrap();
745      }
746    });
747    t1.join().ok().unwrap();
748    t2.join().ok().unwrap();
749  }
750
751  #[test]
752  fn recv_from_outside_runtime() {
753    let (tx, rx) = channel::<i32>();
754    let t = std::thread::spawn (move|| {
755      for _ in 0..40 {
756        assert_eq!(rx.recv().unwrap(), 1);
757      }
758    });
759    for _ in 0..40 {
760      tx.send (1).unwrap();
761    }
762    t.join().ok().unwrap();
763  }
764
765  #[test]
766  fn no_runtime() {
767    let (tx1, rx1) = channel::<i32>();
768    let (tx2, rx2) = channel::<i32>();
769    let t1 = std::thread::spawn (move|| {
770      assert_eq!(rx1.recv().unwrap(), 1);
771      tx2.send (2).unwrap();
772    });
773    let t2 = std::thread::spawn (move|| {
774      tx1.send (1).unwrap();
775      assert_eq!(rx2.recv().unwrap(), 2);
776    });
777    t1.join().ok().unwrap();
778    t2.join().ok().unwrap();
779  }
780
781  #[test]
782  fn oneshot_single_thread_close_port_first() {
783    // Simple test of closing without sending
784    let (_tx, rx) = channel::<i32>();
785    drop (rx);
786  }
787
788  #[test]
789  fn oneshot_single_thread_close_chan_first() {
790    // Simple test of closing without sending
791    let (tx, _rx) = channel::<i32>();
792    drop (tx);
793  }
794
795  #[test]
796  fn oneshot_single_thread_send_port_close() {
797    // Testing that the sender cleans up the payload if receiver is closed
798    let (tx, rx) = channel::<Box <i32>>();
799    drop (rx);
800    assert!(tx.send (Box::new (0)).is_err());
801  }
802
803  #[test]
804  fn oneshot_single_thread_recv_chan_close() {
805    // Receiving on a closed chan will panic
806    let res = std::thread::spawn (move|| {
807      let (tx, rx) = channel::<i32>();
808      drop (tx);
809      rx.recv().unwrap();
810    }).join();
811    // What is our res?
812    assert!(res.is_err());
813  }
814
815  #[test]
816  fn oneshot_single_thread_send_then_recv() {
817    let (tx, rx) = channel::<Box <i32>>();
818    tx.send (Box::new (10)).unwrap();
819    assert!(*rx.recv().unwrap() == 10);
820  }
821
822  #[test]
823  fn oneshot_single_thread_try_send_open() {
824    let (tx, rx) = channel::<i32>();
825    tx.send (10).unwrap();
826    assert!(rx.recv().unwrap() == 10);
827  }
828
829  #[test]
830  fn oneshot_single_thread_try_send_closed() {
831    let (tx, rx) = channel::<i32>();
832    drop (rx);
833    assert!(tx.send (10).is_err());
834  }
835
836  #[test]
837  fn oneshot_single_thread_try_recv_open() {
838    let (tx, rx) = channel::<i32>();
839    tx.send (10).unwrap();
840    assert!(rx.recv() == Ok (10));
841  }
842
843  #[test]
844  fn oneshot_single_thread_try_recv_closed() {
845    let (tx, rx) = channel::<i32>();
846    drop (tx);
847    rx.recv().unwrap_err();
848  }
849
850  #[test]
851  fn oneshot_single_thread_peek_data() {
852    let (tx, rx) = channel::<i32>();
853    assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
854    tx.send (10).unwrap();
855    assert_eq!(rx.try_recv(), Ok (10));
856  }
857
858  #[test]
859  fn oneshot_single_thread_peek_close() {
860    let (tx, rx) = channel::<i32>();
861    drop (tx);
862    assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
863    assert_eq!(rx.try_recv(), Err (TryRecvError::Disconnected));
864  }
865
866  #[test]
867  fn oneshot_single_thread_peek_open() {
868    let (_tx, rx) = channel::<i32>();
869    assert_eq!(rx.try_recv(), Err (TryRecvError::Empty));
870  }
871
872  #[test]
873  fn oneshot_multi_task_recv_then_send () {
874    let (tx, rx) = channel::<Box <i32>>();
875    let _t = std::thread::spawn (move|| {
876      assert!(*rx.recv().unwrap() == 10);
877    });
878
879    tx.send (Box::new (10)).unwrap();
880  }
881
882  #[test]
883  fn oneshot_multi_task_recv_then_close() {
884    let (tx, rx) = channel::<Box <i32>>();
885    let _t = std::thread::spawn (move|| {
886      drop (tx);
887    });
888    let res = std::thread::spawn (move|| {
889      assert!(*rx.recv().unwrap() == 10);
890    }).join();
891    assert!(res.is_err());
892  }
893
894  #[test]
895  fn oneshot_multi_thread_close_stress() {
896    for _ in 0..stress_factor() {
897      let (tx, rx) = channel::<i32>();
898      let _t = std::thread::spawn (move|| {
899        drop (rx);
900      });
901      drop (tx);
902    }
903  }
904
905  #[test]
906  fn oneshot_multi_thread_send_close_stress() {
907    for _ in 0..stress_factor() {
908      let (tx, rx) = channel::<i32>();
909      let _t = std::thread::spawn (move|| {
910        drop (rx);
911      });
912      let _ = std::thread::spawn (move|| {
913        tx.send (1).unwrap();
914      }).join();
915    }
916  }
917
918  #[test]
919  fn oneshot_multi_thread_recv_close_stress() {
920    for _ in 0..stress_factor() {
921      let (tx, rx) = channel::<i32>();
922      std::thread::spawn (move|| {
923        let res = std::thread::spawn (move|| {
924          rx.recv().unwrap();
925        }).join();
926        assert!(res.is_err());
927      });
928      let _t = std::thread::spawn (move|| {
929        std::thread::spawn (move|| {
930          drop (tx);
931        });
932      });
933    }
934  }
935
936  #[test]
937  fn oneshot_multi_thread_send_recv_stress() {
938    for _ in 0..stress_factor() {
939      let (tx, rx) = channel::<Box <isize>>();
940      let _t = std::thread::spawn (move|| {
941        tx.send (Box::new (10)).unwrap();
942      });
943      assert!(*rx.recv().unwrap() == 10);
944    }
945  }
946
947  #[test]
948  fn stream_send_recv_stress() {
949    for _ in 0..stress_factor() {
950      let (tx, rx) = channel();
951
952      send (tx, 0);
953      recv (rx, 0);
954
955      fn send (tx: Sender<Box <i32>>, i: i32) {
956        if i == 10 { return }
957
958        std::thread::spawn (move|| {
959          tx.send (Box::new (i)).unwrap();
960          send (tx, i + 1);
961        });
962      }
963
964      fn recv (rx: Receiver<Box <i32>>, i: i32) {
965        if i == 10 { return }
966
967        std::thread::spawn (move|| {
968          assert!(*rx.recv().unwrap() == i);
969          recv (rx, i + 1);
970        });
971      }
972    }
973  }
974
975  #[test]
976  fn oneshot_single_thread_recv_timeout() {
977    let (tx, rx) = channel();
978    tx.send (true).unwrap();
979    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
980    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)),
981      Err (RecvTimeoutError::Timeout));
982    tx.send (true).unwrap();
983    assert_eq!(rx.recv_timeout (std::time::Duration::from_millis (1)), Ok (true));
984  }
985
986  #[test]
987  fn stress_recv_timeout_two_threads() {
988    let (tx, rx) = channel();
989    let stress = stress_factor() + 100;
990    let timeout = std::time::Duration::from_millis (100);
991
992    std::thread::spawn (move || {
993      for i in 0..stress {
994        if i % 2 == 0 {
995          std::thread::sleep (timeout * 2);
996        }
997        tx.send (1usize).unwrap();
998      }
999    });
1000
1001    let mut recv_count = 0;
1002    loop {
1003      match rx.recv_timeout (timeout) {
1004        Ok (n) => {
1005          assert_eq!(n, 1usize);
1006          recv_count += 1;
1007        }
1008        Err (RecvTimeoutError::Timeout) => { }
1009        Err (RecvTimeoutError::Disconnected) => break
1010      }
1011    }
1012
1013    assert_eq!(recv_count, stress);
1014  }
1015
1016  #[test]
1017  fn recv_a_lot() {
1018    // Regression test that we don't run out of stack in scheduler context
1019    let (tx, rx) = channel();
1020    for _ in 0..10000 { tx.send (true).unwrap(); }
1021    for _ in 0..10000 { rx.recv().unwrap(); }
1022  }
1023
1024  #[test]
1025  fn nested_recv_iter() {
1026    let (tx, rx) = channel::<i32>();
1027    let (total_tx, total_rx) = channel::<i32>();
1028
1029    let _t = std::thread::spawn (move|| {
1030      let mut acc = 0;
1031      for x in rx.iter() {
1032        acc += x;
1033      }
1034      total_tx.send (acc).unwrap();
1035    });
1036
1037    tx.send (3).unwrap();
1038    tx.send (1).unwrap();
1039    tx.send (2).unwrap();
1040    drop (tx);
1041    assert_eq!(total_rx.recv().unwrap(), 6);
1042  }
1043
1044  #[test]
1045  fn recv_iter_break() {
1046    let (tx, rx) = channel::<i32>();
1047    let (count_tx, count_rx) = channel();
1048
1049    let _t = std::thread::spawn (move|| {
1050      let mut count = 0;
1051      for x in rx.iter() {
1052        if count >= 3 {
1053          break;
1054        } else {
1055          count += x;
1056        }
1057      }
1058      count_tx.send (count).unwrap();
1059    });
1060
1061    tx.send (2).unwrap();
1062    tx.send (2).unwrap();
1063    tx.send (2).unwrap();
1064    let _ = tx.send (2);
1065    drop (tx);
1066    assert_eq!(count_rx.recv().unwrap(), 4);
1067  }
1068
1069  #[test]
1070  fn recv_try_iter() {
1071    let (request_tx, request_rx) = channel();
1072    let (response_tx, response_rx) = channel();
1073
1074    // Request `x`s until we have `6`.
1075    let t = std::thread::spawn (move|| {
1076      let mut count = 0;
1077      loop {
1078        for x in response_rx.try_iter() {
1079          count += x;
1080          if count == 6 {
1081            return count;
1082          }
1083        }
1084        println!("test recv try iter send request...");
1085        request_tx.send (true).unwrap();
1086      }
1087    });
1088
1089    for _ in request_rx.iter() {
1090      println!("test recv try iter send response...");
1091      if response_tx.send (2).is_err() {
1092        break;
1093      }
1094    }
1095
1096    println!("test recv try iter join...");
1097
1098    assert_eq!(t.join().unwrap(), 6);
1099  }
1100
1101  #[test]
1102  fn recv_into_iter_owned() {
1103    let mut iter = {
1104      let (tx, rx) = channel::<i32>();
1105      tx.send (1).unwrap();
1106      tx.send (2).unwrap();
1107
1108      rx.into_iter()
1109    };
1110    assert_eq!(iter.next().unwrap(), 1);
1111    assert_eq!(iter.next().unwrap(), 2);
1112    assert!(iter.next().is_none());
1113  }
1114
1115  #[test]
1116  fn recv_into_iter_borrowed() {
1117    let (tx, rx) = channel::<i32>();
1118    tx.send (1).unwrap();
1119    tx.send (2).unwrap();
1120    drop (tx);
1121    let mut iter = (&rx).into_iter();
1122    assert_eq!(iter.next().unwrap(), 1);
1123    assert_eq!(iter.next().unwrap(), 2);
1124    assert!(iter.next().is_none());
1125  }
1126
1127  // TODO: test failed unwrap on RecvError
1128  #[test]
1129  fn try_recv_states() {
1130    let (tx1, rx1) = channel::<i32>();
1131    let (tx2, rx2) = channel::<bool>();
1132    let (tx3, rx3) = channel::<bool>();
1133    let _t = std::thread::spawn (move|| {
1134      rx2.recv().unwrap();
1135      tx1.send (1).unwrap();
1136      tx3.send (true).unwrap();
1137      rx2.recv().unwrap();
1138      drop (tx1);
1139      tx3.send (true).unwrap();
1140    });
1141
1142    assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1143    tx2.send (true).unwrap();
1144    rx3.recv().unwrap();
1145    assert_eq!(rx1.try_recv(), Ok (1));
1146    assert_eq!(rx1.try_recv(), Err (TryRecvError::Empty));
1147    tx2.send (true).unwrap();
1148    rx3.recv().unwrap();
1149    assert_eq!(rx1.try_recv(), Err (TryRecvError::Disconnected));
1150  }
1151
1152  #[test]
1153  fn issue_32114() {
1154    let (tx, _) = channel();
1155    let _ = tx.send (123);
1156    assert_eq!(tx.send (123), Err (SendError (123)));
1157  }
1158
1159  #[test]
1160  fn zero_size() {
1161    let (tx, rx) = channel::<()>();
1162    tx.send (()).unwrap();
1163    let () = rx.recv().unwrap();
1164  }
1165
1166  #[test]
1167  fn race_disconnect_does_not_corrupt_sender_or_abort() {
1168    for _ in 0..200 {
1169      let (tx, rx) = channel::<Box<u64>>();
1170      let h = std::thread::spawn(move || {
1171        for _ in 0..10_000 {
1172          let _ = tx.send(Box::new(0xDEAD_BEEF));
1173        }
1174      });
1175      drop(rx);
1176      h.join().unwrap();
1177    }
1178  }
1179}