two_lock_queue/
lib.rs

1//! Multi-producer, multi-consumer FIFO queue communication primitive.
2//!
3//! This crate provides a multi-producer, multi-consumer, message-based
4//! communication channel, concretely defined among two types:
5//!
6//! * `Sender`
7//! * `Receiver`
8//!
9//! A `Sender` is used to send data to a `Receiver`. Both senders and receivers
10//! are clone-able such that sending and receiving can be done concurrently
11//! across threads.
12//!
13//! # Disconnection
14//!
15//! The send and receive operations will all return a `Result` indicating
16//! whether the operation succeeded or not. An unsuccessful operation is
17//! normally indicative of the other half of the channel having "hung up" by
18//! being dropped in its corresponding thread.
19//!
20//! Once half of a channel has been deallocated, most operations can no longer
21//! continue to make progress, so `Err` will be returned.
22//!
23//! # Examples
24//!
25//! Simple usage:
26//!
27//! ```rust
28//! use std::thread;
29//!
30//! let (tx, rx) = two_lock_queue::channel(1024);
31//!
32//! for i in 0..10 {
33//!     let tx = tx.clone();
34//!     thread::spawn(move || {
35//!         tx.send(i).unwrap();
36//!     });
37//! }
38//!
39//! let mut threads = vec![];
40//!
41//! for _ in 0..10 {
42//!     let rx = rx.clone();
43//!     threads.push(thread::spawn(move || {
44//!         let j = rx.recv().unwrap();
45//!         assert!(0 <= j && j < 10);
46//!     }));
47//! }
48//!
49//! for th in threads {
50//!     th.join().unwrap();
51//! }
52//! ```
53//!
54//! # Algorithm
55//!
56//! The algorithm is a variant of the Michael-Scott two lock queue found as part
57//! of Java's LinkedBlockingQueue. The queue uses a mutex to guard the head
58//! pointer and a mutex to guard the tail pointer. Most of the time, send and
59//! receive operations will only need to lock a single mutex. An `AtomicUsize`
60//! is used to track the number of elements in the queue as well as handle
61//! coordination between the producer and consumer halves.
62
63#![deny(warnings, missing_docs)]
64
65pub use std::sync::mpsc::{SendError, TrySendError};
66pub use std::sync::mpsc::{TryRecvError, RecvError, RecvTimeoutError};
67
68use std::{mem, ops, ptr, usize};
69use std::sync::{Arc, Mutex, MutexGuard, Condvar};
70use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
71use std::time::{Duration, Instant};
72
73/// The sending-half of the channel.
74///
75/// Each instance of `Sender` can only be used in a single thread, but it can be
76/// cloned to create new `Sender` instances for the same underlying channel and
77/// these new  instances can be sent to other threads.
78pub struct Sender<T> {
79    inner: Arc<Inner<T>>,
80}
81
82/// The receiving-half of the channel.
83pub struct Receiver<T> {
84    inner: Arc<Inner<T>>,
85}
86
87// A variant of the "two lock queue" algorithm.  The `tail` mutex gates entry to
88// push elements, and has an associated condition for waiting pushes.  Similarly
89// for the `head` mutex.  The "len" field that they both rely on is maintained
90// as an atomic to avoid needing to get both locks in most cases. Also, to
91// minimize need for pushes to get `head` mutex and vice-versa, cascading
92// notifies are used. When a push notices that it has enabled at least one pop,
93// it signals `not_empty`. That pop in turn signals others if more items have
94// been entered since the signal. And symmetrically for pops signalling pushes.
95//
96// Visibility between producers and consumers is provided as follows:
97//
98// Whenever an element is enqueued, the `tail` lock is acquired and `len`
99// updated.  A subsequent consumer guarantees visibility to the enqueued Node by
100// acquiring the `head` lock and then reading `n = len.load(Acquire)` this gives
101// visibility to the first n items.
102struct Inner<T> {
103    // Maximum number of elements the queue can contain at one time
104    capacity: usize,
105
106    // Current number of elements
107    len: AtomicUsize,
108
109    // `true` when the channel is currently open
110    is_open: AtomicBool,
111
112    // Lock held by take, poll, etc
113    head: Mutex<NodePtr<T>>,
114
115    // Wait queue for waiting takes
116    not_empty: Condvar,
117
118    // Number of senders in existence
119    num_tx: AtomicUsize,
120
121    // Lock held by put, offer, etc
122    tail: Mutex<NodePtr<T>>,
123
124    // Wait queue for waiting puts
125    not_full: Condvar,
126
127    // Number of receivers in existence
128    num_rx: AtomicUsize,
129}
130
131/// Possible errors that `send_timeout` could encounter.
132pub enum SendTimeoutError<T> {
133    /// The channel's receiving half has become disconnected, and there will
134    /// never be any more data received on this channel.
135    Disconnected(T),
136    /// The channel is currently full, and the receiver(s) have not yet
137    /// disconnected.
138    Timeout(T),
139}
140
141/// Creates a new channel of the requested capacity
142///
143/// Returns the `Sender` and `Receiver` halves.
144pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
145    let inner = Inner::with_capacity(capacity);
146    let inner = Arc::new(inner);
147
148    let tx = Sender { inner: inner.clone() };
149    let rx = Receiver { inner: inner };
150
151    (tx, rx)
152}
153
154/// Creates a new channel without a capacity bound.
155///
156/// An alias for `channel(usize::MAX)`
157pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
158    channel(usize::MAX)
159}
160
161// ===== impl Sender =====
162
163impl<T> Sender<T> {
164    /// Attempts to send a value on this channel, returning it back if it could not be sent.
165    ///
166    /// A successful send occurs when it is determined that the other end of the
167    /// channel has not hung up already. An unsuccessful send would be one where
168    /// the corresponding receiver has already been deallocated. Note that a
169    /// return value of Err means that the data will never be received, but a
170    /// return value of Ok does not mean that the data will be received. It is
171    /// possible for the corresponding receiver to hang up immediately after
172    /// this function returns Ok.
173    ///
174    /// This function will **block** the current thread until the channel has
175    /// capacity to accept the value or there are no more receivers to accept
176    /// the value.
177    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
178        self.inner.push(t)
179    }
180
181    /// Attempts to send a value on this channel, blocking for at most
182    /// `timeout`.
183    ///
184    /// The function will always block the current thread if the channel has no
185    /// available capacity for handling the message. The thread will be blocked
186    /// for at most `timeout`, after which, if there still is no capacity,
187    /// `SendTimeoutError::Timeout` will be returned.
188    pub fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
189        self.inner.push_timeout(t, timeout)
190    }
191
192    /// Attempts to send a value on this channel without blocking.
193    ///
194    /// This method differs from `send` by returning immediately if the channel's
195    /// buffer is full or no receiver is waiting to acquire some data. Compared
196    /// with `send`, this function has two failure cases instead of one (one for
197    /// disconnection, one for a full buffer).
198    ///
199    /// See `Sender::send` for notes about guarantees of whether the receiver
200    /// has received the data or not if this function is successful.
201    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
202        self.inner.try_push(t)
203    }
204
205    /// Returns the number of values currently buffered by the channel
206    pub fn len(&self) -> usize {
207        self.inner.len()
208    }
209
210    /// Fully close the channel
211    ///
212    /// This will force close the channel even if there are outstanding `Sender`
213    /// and `Receiver` handles. Further operations on any outstanding handle
214    /// will result in a disconnected error.
215    pub fn close(&self) {
216        self.inner.close();
217    }
218
219    /// Returns `true` if the channel is currently in an open state
220    pub fn is_open(&self) -> bool {
221        self.inner.is_open.load(Ordering::SeqCst)
222    }
223
224    /// Returns the capacity of the queue
225    ///
226    /// ```
227    /// use two_lock_queue::{channel, unbounded};
228    /// # use std::usize;
229    ///
230    /// let (tx, _) = channel(1024);
231    /// assert_eq!(tx.capacity(), 1024);
232    /// # tx.try_send(0).unwrap_err();; // needed for type inference
233    ///
234    /// let (tx, _) = unbounded();
235    /// assert_eq!(tx.capacity(), usize::MAX);
236    /// # tx.try_send(0).unwrap_err(); // needed for type inference
237    /// ```
238    pub fn capacity(&self) -> usize {
239        self.inner.capacity
240    }
241}
242
243impl<T> Clone for Sender<T> {
244    fn clone(&self) -> Sender<T> {
245        // Attempt to clone the inner handle. Doing this before incrementing
246        // `num_tx` prevents having to check `num_tx` for overflow and instead
247        // rely on `Arc::clone` to prevent the overflow.
248
249        let inner = self.inner.clone();
250
251        // Increment `num_tx`
252        self.inner.num_tx.fetch_add(1, Ordering::SeqCst);
253
254        // Return the new sender
255        Sender { inner: inner }
256    }
257}
258
259impl<T> Drop for Sender<T> {
260    fn drop(&mut self) {
261        if 1 == self.inner.num_tx.fetch_sub(1, Ordering::SeqCst) {
262            self.inner.close();
263        }
264    }
265}
266
267// ===== impl Receiver =====
268
269impl<T> Receiver<T> {
270    /// Attempts to wait for a value on this receiver, returning an error if the
271    /// corresponding channel has hung up.
272    ///
273    /// This function will always block the current thread if there is no data
274    /// available and it's possible for more data to be sent. Once a message is
275    /// sent to the corresponding `Sender`, then this receiver will wake up and
276    /// return that message.
277    ///
278    /// If the corresponding Sender has disconnected, or it disconnects while
279    /// this call is blocking, this call will wake up and return `Err` to indicate
280    /// that no more messages can ever be received on this channel. However,
281    /// since channels are buffered, messages sent before the disconnect will
282    /// still be properly received.
283    pub fn recv(&self) -> Result<T, RecvError> {
284        self.inner.pop()
285    }
286
287    /// Attempts to wait for a value on this receiver, returning an error if the
288    /// corresponding channel has hung up, or if it waits more than timeout.
289    ///
290    /// This function will always block the current thread if there is no data
291    /// available and it's possible for more data to be sent. Once a message is
292    /// sent to the corresponding Sender, then this receiver will wake up and
293    /// return that message.
294    ///
295    /// If the corresponding `Sender` has disconnected, or it disconnects while
296    /// this call is blocking, this call will wake up and return `Err` to
297    /// indicate that no more messages can ever be received on this channel.
298    /// However, since channels are buffered, messages sent before the
299    /// disconnect will still be properly received.
300    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
301        self.inner.pop_timeout(timeout)
302    }
303
304    /// Attempts to return a pending value on this receiver without blocking
305    ///
306    /// This method will never block the caller in order to wait for data to
307    /// become available. Instead, this will always return immediately with a
308    /// possible option of pending data on the channel.
309    ///
310    /// This is useful for a flavor of "optimistic check" before deciding to
311    /// block on a receiver.
312    pub fn try_recv(&self) -> Result<T, TryRecvError> {
313        self.inner.try_pop()
314    }
315
316    /// Returns the number of values currently buffered by the channel
317    pub fn len(&self) -> usize {
318        self.inner.len()
319    }
320
321    /// Fully close the channel
322    ///
323    /// This will force close the channel even if there are outstanding `Sender`
324    /// and `Receiver` handles. Further operations on any outstanding handle
325    /// will result in a disconnected error.
326    pub fn close(&self) {
327        self.inner.close();
328    }
329
330    /// Returns `true` if the channel is currently in an open state
331    pub fn is_open(&self) -> bool {
332        self.inner.is_open.load(Ordering::SeqCst)
333    }
334
335    /// Returns the capacity of the queue
336    ///
337    /// ```
338    /// use two_lock_queue::{channel, unbounded};
339    /// # use two_lock_queue::TryRecvError;
340    /// # use std::usize;
341    ///
342    /// let (_, rx) = channel(1024);
343    /// assert_eq!(rx.capacity(), 1024);
344    /// # let _: Result<u8, TryRecvError> = rx.try_recv(); // needed for type inference
345    ///
346    /// let (_, rx) = unbounded();
347    /// assert_eq!(rx.capacity(), usize::MAX);
348    /// # let _: Result<u8, TryRecvError> = rx.try_recv(); // needed for type inference
349    /// ```
350    pub fn capacity(&self) -> usize {
351        self.inner.capacity
352    }
353}
354
355impl<T> Clone for Receiver<T> {
356    fn clone(&self) -> Receiver<T> {
357        // Attempt to clone the inner handle. Doing this before incrementing
358        // `num_rx` prevents having to check `num_rx` for overflow and instead
359        // rely on `Arc::clone` to prevent the overflow.
360
361        let inner = self.inner.clone();
362
363        // Increment `num_rx`
364        self.inner.num_rx.fetch_add(1, Ordering::SeqCst);
365
366        Receiver { inner: inner }
367    }
368}
369
370impl<T> Drop for Receiver<T> {
371    fn drop(&mut self) {
372        if 1 == self.inner.num_rx.fetch_sub(1, Ordering::SeqCst) {
373            self.inner.close();
374        }
375    }
376}
377
378impl<T> Inner<T> {
379    fn with_capacity(capacity: usize) -> Inner<T> {
380        let head = NodePtr::new(Box::new(Node::empty()));
381
382        Inner {
383            capacity: capacity,
384            len: AtomicUsize::new(0),
385            is_open: AtomicBool::new(true),
386            head: Mutex::new(head),
387            not_empty: Condvar::new(),
388            num_tx: AtomicUsize::new(1),
389            tail: Mutex::new(head),
390            not_full: Condvar::new(),
391            num_rx: AtomicUsize::new(1),
392        }
393    }
394
395    fn len(&self) -> usize {
396        self.len.load(Ordering::SeqCst)
397    }
398
399    fn push(&self, el: T) -> Result<(), SendError<T>> {
400        let node = Box::new(Node::new(el));
401
402        // Acquire the write lock
403        let mut tail = self.tail.lock().ok().expect("something went wrong");
404
405        while self.len.load(Ordering::Acquire) == self.capacity {
406            // Always check this before sleeping
407            if !self.is_open.load(Ordering::Relaxed) {
408                return Err(SendError(node.into_inner()));
409            }
410
411            tail = self.not_full.wait(tail).ok().expect("something went wrong");
412        }
413
414        if !self.is_open.load(Ordering::Relaxed) {
415            return Err(SendError(node.into_inner()));
416        }
417
418        self.enqueue(node, tail);
419        Ok(())
420    }
421
422    fn try_push(&self, el: T) -> Result<(), TrySendError<T>> {
423        let node = Box::new(Node::new(el));
424
425        // Acquire the write lock
426        let tail = self.tail.lock().ok().expect("something went wrong");
427
428        if !self.is_open.load(Ordering::Relaxed) {
429            return Err(TrySendError::Disconnected(node.into_inner()));
430        }
431
432        if self.len.load(Ordering::Acquire) == self.capacity {
433            return Err(TrySendError::Full(node.into_inner()));
434        }
435
436        self.enqueue(node, tail);
437        Ok(())
438    }
439
440    fn push_timeout(&self, el: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
441        let node = Box::new(Node::new(el));
442
443        // Acquire the write lock
444        let mut tail = self.tail.lock().ok().expect("something went wrong");
445
446        if self.len.load(Ordering::Acquire) == self.capacity {
447            let mut now = Instant::now();
448            let deadline = now + dur;
449
450            loop {
451                if now >= deadline {
452                    return Err(SendTimeoutError::Timeout(node.into_inner()));
453                }
454
455                if !self.is_open.load(Ordering::Relaxed) {
456                    return Err(SendTimeoutError::Disconnected(node.into_inner()));
457                }
458
459                tail = self.not_full
460                    .wait_timeout(tail, deadline.duration_since(now))
461                    .ok()
462                    .expect("something went wrong")
463                    .0;
464
465                if self.len.load(Ordering::Acquire) != self.capacity {
466                    break;
467                }
468
469                now = Instant::now();
470            }
471        }
472
473        if !self.is_open.load(Ordering::Relaxed) {
474            return Err(SendTimeoutError::Disconnected(node.into_inner()));
475        }
476
477        self.enqueue(node, tail);
478
479        Ok(())
480    }
481
482    fn enqueue(&self, el: Box<Node<T>>, mut tail: MutexGuard<NodePtr<T>>) {
483        let ptr = NodePtr::new(el);
484
485        tail.next = ptr;
486        *tail = ptr;
487
488        // Increment the count
489        let len = self.len.fetch_add(1, Ordering::Release);
490
491        if len + 1 < self.capacity {
492            self.not_full.notify_one();
493        }
494
495        drop(tail);
496
497        if len == 0 {
498            let _l = self.head
499                .lock()
500                .ok()
501                .expect("something went wrong");
502
503            self.not_empty.notify_one();
504        }
505    }
506
507    fn pop(&self) -> Result<T, RecvError> {
508        // Acquire the read lock
509        let mut head = self.head.lock().ok().expect("something went wrong");
510
511        while self.len.load(Ordering::Acquire) == 0 {
512            // Ensure that there are still senders
513            if !self.is_open.load(Ordering::Relaxed) {
514                return Err(RecvError);
515            }
516
517            head = self.not_empty.wait(head).ok().expect("something went wrong");
518        }
519
520        Ok(self.dequeue(head))
521    }
522
523    fn try_pop(&self) -> Result<T, TryRecvError> {
524        // Acquire the read lock
525        let head = self.head.lock().ok().expect("something went wrong");
526
527        if self.len.load(Ordering::Acquire) == 0 {
528            if !self.is_open.load(Ordering::Relaxed) {
529                return Err(TryRecvError::Disconnected);
530            } else {
531                return Err(TryRecvError::Empty);
532            }
533        }
534
535        Ok(self.dequeue(head))
536    }
537
538    fn pop_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
539        // Acquire the read lock
540        let mut head = self.head.lock().ok().expect("something went wrong");
541
542        if self.len.load(Ordering::Acquire) == 0 {
543            let mut now = Instant::now();
544            let deadline = now + dur;
545
546            loop {
547                if now >= deadline {
548                    return Err(RecvTimeoutError::Timeout);
549                }
550
551                // Ensure that there are still senders
552                if !self.is_open.load(Ordering::Relaxed) {
553                    return Err(RecvTimeoutError::Disconnected);
554                }
555
556                head = self.not_empty
557                    .wait_timeout(head, deadline.duration_since(now))
558                    .ok()
559                    .expect("something went wrong")
560                    .0;
561
562                if self.len.load(Ordering::Acquire) != 0 {
563                    break;
564                }
565
566                now = Instant::now();
567            }
568        }
569
570        // At this point, we are guaranteed to be able to dequeue a value
571        Ok(self.dequeue(head))
572    }
573
574    fn dequeue(&self, mut head: MutexGuard<NodePtr<T>>) -> T {
575        let h = *head;
576        let mut first = h.next;
577
578        *head = first;
579
580        let val = first.item.take().expect("item already consumed");
581        let len = self.len.fetch_sub(1, Ordering::Release);
582
583        if len > 1 {
584            self.not_empty.notify_one();
585        }
586
587        // Release the lock here so that acquire the write lock does not result
588        // in a deadlock
589        drop(head);
590
591        // Free memory
592        h.free();
593
594        if len == self.capacity {
595            let _l = self.tail
596                .lock()
597                .ok()
598                .expect("something went wrong");
599
600            self.not_full.notify_one();
601        }
602
603        val
604    }
605
606    fn close(&self) {
607        if self.is_open.swap(false, Ordering::SeqCst) {
608            self.notify_tx();
609            self.notify_rx();
610        }
611    }
612
613    fn notify_tx(&self) {
614        let _lock = self.head.lock().expect("something went wrong");
615        self.not_empty.notify_all();
616    }
617
618    fn notify_rx(&self) {
619        let _lock = self.tail.lock().expect("something went wrong");
620        self.not_full.notify_all();
621    }
622}
623
624impl<T> Drop for Inner<T> {
625    fn drop(&mut self) {
626        while let Ok(_) = self.try_pop() {
627        }
628    }
629}
630
631struct Node<T> {
632    next: NodePtr<T>,
633    item: Option<T>,
634}
635
636impl<T> Node<T> {
637    fn new(val: T) -> Node<T> {
638        Node {
639            next: NodePtr::null(),
640            item: Some(val),
641        }
642    }
643
644    fn empty() -> Node<T> {
645        Node {
646            next: NodePtr::null(),
647            item: None,
648        }
649    }
650
651    fn into_inner(self) -> T {
652        self.item.unwrap()
653    }
654}
655
656struct NodePtr<T> {
657    ptr: *mut Node<T>,
658}
659
660impl<T> NodePtr<T> {
661    fn new(node: Box<Node<T>>) -> NodePtr<T> {
662        NodePtr { ptr: unsafe { mem::transmute(node) } }
663    }
664
665    fn null() -> NodePtr<T> {
666        NodePtr { ptr: ptr::null_mut() }
667    }
668
669    fn free(self) {
670        let NodePtr { ptr } = self;
671        let _: Box<Node<T>> = unsafe { mem::transmute(ptr) };
672    }
673}
674
675impl<T> ops::Deref for NodePtr<T> {
676    type Target = Node<T>;
677
678    fn deref(&self) -> &Node<T> {
679        unsafe { mem::transmute(self.ptr) }
680    }
681}
682
683impl<T> ops::DerefMut for NodePtr<T> {
684    fn deref_mut(&mut self) -> &mut Node<T> {
685        unsafe { mem::transmute(self.ptr) }
686    }
687}
688
689impl<T> Clone for NodePtr<T> {
690    fn clone(&self) -> NodePtr<T> {
691        NodePtr { ptr: self.ptr }
692    }
693}
694
695impl<T> Copy for NodePtr<T> {}
696unsafe impl<T> Send for NodePtr<T> where T: Send {}
697
698#[cfg(test)]
699mod test {
700    use super::*;
701    use std::thread;
702    use std::time::{Duration, Instant};
703
704    #[test]
705    fn single_thread_send_recv() {
706        let (tx, rx) = channel(1024);
707
708        // Check the length
709        assert_eq!(0, tx.len());
710        assert_eq!(0, rx.len());
711
712        // Send a value
713        tx.send("hello").unwrap();
714
715        // Check the length again
716        assert_eq!(1, tx.len());
717        assert_eq!(1, rx.len());
718
719        // Receive the value
720        assert_eq!("hello", rx.recv().unwrap());
721
722        // Check the length
723        assert_eq!(0, tx.len());
724        assert_eq!(0, rx.len());
725
726        // Try taking on an empty queue
727        assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
728    }
729
730    #[test]
731    fn single_thread_send_timeout() {
732        let (tx, _rx) = channel(1);
733
734        tx.try_send("hello").unwrap();
735
736        let now = Instant::now();
737        let dur = Duration::from_millis(200);
738
739        assert!(tx.send_timeout("world", dur).is_err());
740
741        let act = now.elapsed();
742
743        assert!(act >= dur);
744        assert!(act < dur * 2);
745    }
746
747    #[test]
748    fn single_thread_recv_timeout() {
749        let (_tx, rx) = channel::<u32>(1024);
750
751        let now = Instant::now();
752        let dur = Duration::from_millis(200);
753
754        assert!(rx.recv_timeout(dur).is_err());
755
756        let act = now.elapsed();
757
758        assert!(act >= dur);
759        assert!(act < dur * 2);
760    }
761
762    #[test]
763    fn single_consumer_single_producer() {
764        let (tx, rx) = channel(1024);
765
766        thread::spawn(move || {
767            thread::sleep(Duration::from_millis(10));
768
769            for i in 0..10_000 {
770                tx.send(i).unwrap();
771            }
772        });
773
774        for i in 0..10_000 {
775            assert_eq!(i, rx.recv().unwrap());
776        }
777
778        assert!(rx.recv().is_err());
779    }
780
781    #[test]
782    fn single_consumer_multi_producer() {
783        let (tx, rx) = channel(1024);
784
785        for t in 0..10 {
786            let tx = tx.clone();
787
788            thread::spawn(move || {
789                for i in 0..10_000 {
790                    tx.send((t, i)).unwrap();
791                }
792            });
793        }
794
795        drop(tx);
796
797        let mut vals = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
798
799        for _ in 0..10 * 10_000 {
800            let (t, v) = rx.recv().unwrap();
801            assert_eq!(vals[t], v);
802            vals[t] += 1;
803        }
804
805        assert!(rx.recv().is_err());
806
807        for i in 0..10 {
808            assert_eq!(vals[i], 10_000);
809        }
810    }
811
812    #[test]
813    fn multi_consumer_multi_producer() {
814        let (tx, rx) = channel(1024);
815        let (res_tx, res_rx) = channel(1024);
816
817        const PER_PRODUCER: usize = 10_000;
818
819        // Producers
820        for t in 0..5 {
821            let tx = tx.clone();
822
823            thread::spawn(move || {
824                for i in 1..PER_PRODUCER {
825                    tx.send((t, i)).unwrap();
826
827                    if i % 10 == 0 {
828                        thread::yield_now();
829                    }
830                }
831            });
832        }
833
834        drop(tx);
835
836        // Consumers
837        for _ in 0..5 {
838            let rx = rx.clone();
839            let res_tx = res_tx.clone();
840
841            thread::spawn(move || {
842                let mut vals = vec![];
843                let mut per_producer = [0, 0, 0, 0, 0];
844
845                loop {
846                    let (t, v) = match rx.recv() {
847                        Ok(v) => v,
848                        _ => break,
849                    };
850
851                    assert!(per_producer[t] < v);
852                    per_producer[t] = v;
853
854                    vals.push((t, v));
855
856                    if v % 10 == 0 {
857                        thread::yield_now();
858                    }
859                }
860
861                res_tx.send(vals).unwrap();
862            });
863        }
864
865        drop(rx);
866        drop(res_tx);
867
868        let mut all_vals = vec![];
869
870        for _ in 0..5 {
871            let vals = res_rx.recv().unwrap();
872
873            for &v in vals.iter() {
874                all_vals.push(v);
875            }
876        }
877
878        all_vals.sort();
879
880        let mut per_producer = [1, 1, 1, 1, 1];
881
882        for &(t, v) in all_vals.iter() {
883            assert_eq!(per_producer[t], v);
884            per_producer[t] += 1;
885        }
886
887        for &v in per_producer.iter() {
888            assert_eq!(PER_PRODUCER, v);
889        }
890    }
891
892    #[test]
893    fn queue_with_capacity() {
894        let (tx, rx) = channel(8);
895
896        for i in 0..8 {
897            assert!(tx.try_send(i).is_ok());
898        }
899
900        assert_eq!(TrySendError::Full(8), tx.try_send(8).unwrap_err());
901        assert_eq!(0, rx.try_recv().unwrap());
902
903        assert!(tx.try_send(8).is_ok());
904
905        for i in 1..9 {
906            assert_eq!(i, rx.try_recv().unwrap());
907        }
908    }
909
910    #[test]
911    fn multi_producer_at_capacity() {
912        let (tx, rx) = channel(8);
913
914        for _ in 0..8 {
915            let tx = tx.clone();
916
917            thread::spawn(move || {
918                for i in 0..1_000 {
919                    tx.send(i).unwrap();
920                }
921            });
922        }
923
924        drop(tx);
925
926        for _ in 0..8 * 1_000 {
927            rx.recv().unwrap();
928        }
929
930        rx.recv().unwrap_err();
931    }
932
933    #[test]
934    fn test_tx_shutdown() {
935        let (tx, rx) = channel(1024);
936
937        {
938            // Clone tx to keep a handle open
939            let tx = tx.clone();
940            thread::spawn(move || {
941                tx.send("hello").unwrap();
942                tx.close();
943            });
944        }
945
946        assert_eq!("hello", rx.recv().unwrap());
947        assert!(rx.recv().is_err());
948        assert!(tx.send("goodbye").is_err());
949    }
950
951    #[test]
952    fn test_rx_shutdown() {
953        let (tx, rx) = channel(1024);
954
955        {
956            let tx = tx.clone();
957            let rx = rx.clone();
958
959            thread::spawn(move || {
960                tx.send("hello").unwrap();
961                rx.close();
962            });
963        }
964
965        assert_eq!("hello", rx.recv().unwrap());
966        assert!(rx.recv().is_err());
967        assert!(tx.send("goodbye").is_err());
968    }
969}