two_lock_queue/lib.rs
1//! Multi-producer, multi-consumer FIFO queue communication primitive.
2//!
3//! This crate provides a multi-producer, multi-consumer, message-based
4//! communication channel, concretely defined among two types:
5//!
6//! * `Sender`
7//! * `Receiver`
8//!
9//! A `Sender` is used to send data to a `Receiver`. Both senders and receivers
10//! are clone-able such that sending and receiving can be done concurrently
11//! across threads.
12//!
13//! # Disconnection
14//!
15//! The send and receive operations will all return a `Result` indicating
16//! whether the operation succeeded or not. An unsuccessful operation is
17//! normally indicative of the other half of the channel having "hung up" by
18//! being dropped in its corresponding thread.
19//!
20//! Once half of a channel has been deallocated, most operations can no longer
21//! continue to make progress, so `Err` will be returned.
22//!
23//! # Examples
24//!
25//! Simple usage:
26//!
27//! ```rust
28//! use std::thread;
29//!
30//! let (tx, rx) = two_lock_queue::channel(1024);
31//!
32//! for i in 0..10 {
33//! let tx = tx.clone();
34//! thread::spawn(move || {
35//! tx.send(i).unwrap();
36//! });
37//! }
38//!
39//! let mut threads = vec![];
40//!
41//! for _ in 0..10 {
42//! let rx = rx.clone();
43//! threads.push(thread::spawn(move || {
44//! let j = rx.recv().unwrap();
45//! assert!(0 <= j && j < 10);
46//! }));
47//! }
48//!
49//! for th in threads {
50//! th.join().unwrap();
51//! }
52//! ```
53//!
54//! # Algorithm
55//!
56//! The algorithm is a variant of the Michael-Scott two lock queue found as part
57//! of Java's LinkedBlockingQueue. The queue uses a mutex to guard the head
58//! pointer and a mutex to guard the tail pointer. Most of the time, send and
59//! receive operations will only need to lock a single mutex. An `AtomicUsize`
60//! is used to track the number of elements in the queue as well as handle
61//! coordination between the producer and consumer halves.
62
63#![deny(warnings, missing_docs)]
64
65pub use std::sync::mpsc::{SendError, TrySendError};
66pub use std::sync::mpsc::{TryRecvError, RecvError, RecvTimeoutError};
67
68use std::{mem, ops, ptr, usize};
69use std::sync::{Arc, Mutex, MutexGuard, Condvar};
70use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
71use std::time::{Duration, Instant};
72
73/// The sending-half of the channel.
74///
75/// Each instance of `Sender` can only be used in a single thread, but it can be
76/// cloned to create new `Sender` instances for the same underlying channel and
77/// these new instances can be sent to other threads.
78pub struct Sender<T> {
79 inner: Arc<Inner<T>>,
80}
81
82/// The receiving-half of the channel.
83pub struct Receiver<T> {
84 inner: Arc<Inner<T>>,
85}
86
87// A variant of the "two lock queue" algorithm. The `tail` mutex gates entry to
88// push elements, and has an associated condition for waiting pushes. Similarly
89// for the `head` mutex. The "len" field that they both rely on is maintained
90// as an atomic to avoid needing to get both locks in most cases. Also, to
91// minimize need for pushes to get `head` mutex and vice-versa, cascading
92// notifies are used. When a push notices that it has enabled at least one pop,
93// it signals `not_empty`. That pop in turn signals others if more items have
94// been entered since the signal. And symmetrically for pops signalling pushes.
95//
96// Visibility between producers and consumers is provided as follows:
97//
98// Whenever an element is enqueued, the `tail` lock is acquired and `len`
99// updated. A subsequent consumer guarantees visibility to the enqueued Node by
100// acquiring the `head` lock and then reading `n = len.load(Acquire)` this gives
101// visibility to the first n items.
102struct Inner<T> {
103 // Maximum number of elements the queue can contain at one time
104 capacity: usize,
105
106 // Current number of elements
107 len: AtomicUsize,
108
109 // `true` when the channel is currently open
110 is_open: AtomicBool,
111
112 // Lock held by take, poll, etc
113 head: Mutex<NodePtr<T>>,
114
115 // Wait queue for waiting takes
116 not_empty: Condvar,
117
118 // Number of senders in existence
119 num_tx: AtomicUsize,
120
121 // Lock held by put, offer, etc
122 tail: Mutex<NodePtr<T>>,
123
124 // Wait queue for waiting puts
125 not_full: Condvar,
126
127 // Number of receivers in existence
128 num_rx: AtomicUsize,
129}
130
131/// Possible errors that `send_timeout` could encounter.
132pub enum SendTimeoutError<T> {
133 /// The channel's receiving half has become disconnected, and there will
134 /// never be any more data received on this channel.
135 Disconnected(T),
136 /// The channel is currently full, and the receiver(s) have not yet
137 /// disconnected.
138 Timeout(T),
139}
140
141/// Creates a new channel of the requested capacity
142///
143/// Returns the `Sender` and `Receiver` halves.
144pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
145 let inner = Inner::with_capacity(capacity);
146 let inner = Arc::new(inner);
147
148 let tx = Sender { inner: inner.clone() };
149 let rx = Receiver { inner: inner };
150
151 (tx, rx)
152}
153
154/// Creates a new channel without a capacity bound.
155///
156/// An alias for `channel(usize::MAX)`
157pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
158 channel(usize::MAX)
159}
160
161// ===== impl Sender =====
162
163impl<T> Sender<T> {
164 /// Attempts to send a value on this channel, returning it back if it could not be sent.
165 ///
166 /// A successful send occurs when it is determined that the other end of the
167 /// channel has not hung up already. An unsuccessful send would be one where
168 /// the corresponding receiver has already been deallocated. Note that a
169 /// return value of Err means that the data will never be received, but a
170 /// return value of Ok does not mean that the data will be received. It is
171 /// possible for the corresponding receiver to hang up immediately after
172 /// this function returns Ok.
173 ///
174 /// This function will **block** the current thread until the channel has
175 /// capacity to accept the value or there are no more receivers to accept
176 /// the value.
177 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
178 self.inner.push(t)
179 }
180
181 /// Attempts to send a value on this channel, blocking for at most
182 /// `timeout`.
183 ///
184 /// The function will always block the current thread if the channel has no
185 /// available capacity for handling the message. The thread will be blocked
186 /// for at most `timeout`, after which, if there still is no capacity,
187 /// `SendTimeoutError::Timeout` will be returned.
188 pub fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
189 self.inner.push_timeout(t, timeout)
190 }
191
192 /// Attempts to send a value on this channel without blocking.
193 ///
194 /// This method differs from `send` by returning immediately if the channel's
195 /// buffer is full or no receiver is waiting to acquire some data. Compared
196 /// with `send`, this function has two failure cases instead of one (one for
197 /// disconnection, one for a full buffer).
198 ///
199 /// See `Sender::send` for notes about guarantees of whether the receiver
200 /// has received the data or not if this function is successful.
201 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
202 self.inner.try_push(t)
203 }
204
205 /// Returns the number of values currently buffered by the channel
206 pub fn len(&self) -> usize {
207 self.inner.len()
208 }
209
210 /// Fully close the channel
211 ///
212 /// This will force close the channel even if there are outstanding `Sender`
213 /// and `Receiver` handles. Further operations on any outstanding handle
214 /// will result in a disconnected error.
215 pub fn close(&self) {
216 self.inner.close();
217 }
218
219 /// Returns `true` if the channel is currently in an open state
220 pub fn is_open(&self) -> bool {
221 self.inner.is_open.load(Ordering::SeqCst)
222 }
223
224 /// Returns the capacity of the queue
225 ///
226 /// ```
227 /// use two_lock_queue::{channel, unbounded};
228 /// # use std::usize;
229 ///
230 /// let (tx, _) = channel(1024);
231 /// assert_eq!(tx.capacity(), 1024);
232 /// # tx.try_send(0).unwrap_err();; // needed for type inference
233 ///
234 /// let (tx, _) = unbounded();
235 /// assert_eq!(tx.capacity(), usize::MAX);
236 /// # tx.try_send(0).unwrap_err(); // needed for type inference
237 /// ```
238 pub fn capacity(&self) -> usize {
239 self.inner.capacity
240 }
241}
242
243impl<T> Clone for Sender<T> {
244 fn clone(&self) -> Sender<T> {
245 // Attempt to clone the inner handle. Doing this before incrementing
246 // `num_tx` prevents having to check `num_tx` for overflow and instead
247 // rely on `Arc::clone` to prevent the overflow.
248
249 let inner = self.inner.clone();
250
251 // Increment `num_tx`
252 self.inner.num_tx.fetch_add(1, Ordering::SeqCst);
253
254 // Return the new sender
255 Sender { inner: inner }
256 }
257}
258
259impl<T> Drop for Sender<T> {
260 fn drop(&mut self) {
261 if 1 == self.inner.num_tx.fetch_sub(1, Ordering::SeqCst) {
262 self.inner.close();
263 }
264 }
265}
266
267// ===== impl Receiver =====
268
269impl<T> Receiver<T> {
270 /// Attempts to wait for a value on this receiver, returning an error if the
271 /// corresponding channel has hung up.
272 ///
273 /// This function will always block the current thread if there is no data
274 /// available and it's possible for more data to be sent. Once a message is
275 /// sent to the corresponding `Sender`, then this receiver will wake up and
276 /// return that message.
277 ///
278 /// If the corresponding Sender has disconnected, or it disconnects while
279 /// this call is blocking, this call will wake up and return `Err` to indicate
280 /// that no more messages can ever be received on this channel. However,
281 /// since channels are buffered, messages sent before the disconnect will
282 /// still be properly received.
283 pub fn recv(&self) -> Result<T, RecvError> {
284 self.inner.pop()
285 }
286
287 /// Attempts to wait for a value on this receiver, returning an error if the
288 /// corresponding channel has hung up, or if it waits more than timeout.
289 ///
290 /// This function will always block the current thread if there is no data
291 /// available and it's possible for more data to be sent. Once a message is
292 /// sent to the corresponding Sender, then this receiver will wake up and
293 /// return that message.
294 ///
295 /// If the corresponding `Sender` has disconnected, or it disconnects while
296 /// this call is blocking, this call will wake up and return `Err` to
297 /// indicate that no more messages can ever be received on this channel.
298 /// However, since channels are buffered, messages sent before the
299 /// disconnect will still be properly received.
300 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
301 self.inner.pop_timeout(timeout)
302 }
303
304 /// Attempts to return a pending value on this receiver without blocking
305 ///
306 /// This method will never block the caller in order to wait for data to
307 /// become available. Instead, this will always return immediately with a
308 /// possible option of pending data on the channel.
309 ///
310 /// This is useful for a flavor of "optimistic check" before deciding to
311 /// block on a receiver.
312 pub fn try_recv(&self) -> Result<T, TryRecvError> {
313 self.inner.try_pop()
314 }
315
316 /// Returns the number of values currently buffered by the channel
317 pub fn len(&self) -> usize {
318 self.inner.len()
319 }
320
321 /// Fully close the channel
322 ///
323 /// This will force close the channel even if there are outstanding `Sender`
324 /// and `Receiver` handles. Further operations on any outstanding handle
325 /// will result in a disconnected error.
326 pub fn close(&self) {
327 self.inner.close();
328 }
329
330 /// Returns `true` if the channel is currently in an open state
331 pub fn is_open(&self) -> bool {
332 self.inner.is_open.load(Ordering::SeqCst)
333 }
334
335 /// Returns the capacity of the queue
336 ///
337 /// ```
338 /// use two_lock_queue::{channel, unbounded};
339 /// # use two_lock_queue::TryRecvError;
340 /// # use std::usize;
341 ///
342 /// let (_, rx) = channel(1024);
343 /// assert_eq!(rx.capacity(), 1024);
344 /// # let _: Result<u8, TryRecvError> = rx.try_recv(); // needed for type inference
345 ///
346 /// let (_, rx) = unbounded();
347 /// assert_eq!(rx.capacity(), usize::MAX);
348 /// # let _: Result<u8, TryRecvError> = rx.try_recv(); // needed for type inference
349 /// ```
350 pub fn capacity(&self) -> usize {
351 self.inner.capacity
352 }
353}
354
355impl<T> Clone for Receiver<T> {
356 fn clone(&self) -> Receiver<T> {
357 // Attempt to clone the inner handle. Doing this before incrementing
358 // `num_rx` prevents having to check `num_rx` for overflow and instead
359 // rely on `Arc::clone` to prevent the overflow.
360
361 let inner = self.inner.clone();
362
363 // Increment `num_rx`
364 self.inner.num_rx.fetch_add(1, Ordering::SeqCst);
365
366 Receiver { inner: inner }
367 }
368}
369
370impl<T> Drop for Receiver<T> {
371 fn drop(&mut self) {
372 if 1 == self.inner.num_rx.fetch_sub(1, Ordering::SeqCst) {
373 self.inner.close();
374 }
375 }
376}
377
378impl<T> Inner<T> {
379 fn with_capacity(capacity: usize) -> Inner<T> {
380 let head = NodePtr::new(Box::new(Node::empty()));
381
382 Inner {
383 capacity: capacity,
384 len: AtomicUsize::new(0),
385 is_open: AtomicBool::new(true),
386 head: Mutex::new(head),
387 not_empty: Condvar::new(),
388 num_tx: AtomicUsize::new(1),
389 tail: Mutex::new(head),
390 not_full: Condvar::new(),
391 num_rx: AtomicUsize::new(1),
392 }
393 }
394
395 fn len(&self) -> usize {
396 self.len.load(Ordering::SeqCst)
397 }
398
399 fn push(&self, el: T) -> Result<(), SendError<T>> {
400 let node = Box::new(Node::new(el));
401
402 // Acquire the write lock
403 let mut tail = self.tail.lock().ok().expect("something went wrong");
404
405 while self.len.load(Ordering::Acquire) == self.capacity {
406 // Always check this before sleeping
407 if !self.is_open.load(Ordering::Relaxed) {
408 return Err(SendError(node.into_inner()));
409 }
410
411 tail = self.not_full.wait(tail).ok().expect("something went wrong");
412 }
413
414 if !self.is_open.load(Ordering::Relaxed) {
415 return Err(SendError(node.into_inner()));
416 }
417
418 self.enqueue(node, tail);
419 Ok(())
420 }
421
422 fn try_push(&self, el: T) -> Result<(), TrySendError<T>> {
423 let node = Box::new(Node::new(el));
424
425 // Acquire the write lock
426 let tail = self.tail.lock().ok().expect("something went wrong");
427
428 if !self.is_open.load(Ordering::Relaxed) {
429 return Err(TrySendError::Disconnected(node.into_inner()));
430 }
431
432 if self.len.load(Ordering::Acquire) == self.capacity {
433 return Err(TrySendError::Full(node.into_inner()));
434 }
435
436 self.enqueue(node, tail);
437 Ok(())
438 }
439
440 fn push_timeout(&self, el: T, dur: Duration) -> Result<(), SendTimeoutError<T>> {
441 let node = Box::new(Node::new(el));
442
443 // Acquire the write lock
444 let mut tail = self.tail.lock().ok().expect("something went wrong");
445
446 if self.len.load(Ordering::Acquire) == self.capacity {
447 let mut now = Instant::now();
448 let deadline = now + dur;
449
450 loop {
451 if now >= deadline {
452 return Err(SendTimeoutError::Timeout(node.into_inner()));
453 }
454
455 if !self.is_open.load(Ordering::Relaxed) {
456 return Err(SendTimeoutError::Disconnected(node.into_inner()));
457 }
458
459 tail = self.not_full
460 .wait_timeout(tail, deadline.duration_since(now))
461 .ok()
462 .expect("something went wrong")
463 .0;
464
465 if self.len.load(Ordering::Acquire) != self.capacity {
466 break;
467 }
468
469 now = Instant::now();
470 }
471 }
472
473 if !self.is_open.load(Ordering::Relaxed) {
474 return Err(SendTimeoutError::Disconnected(node.into_inner()));
475 }
476
477 self.enqueue(node, tail);
478
479 Ok(())
480 }
481
482 fn enqueue(&self, el: Box<Node<T>>, mut tail: MutexGuard<NodePtr<T>>) {
483 let ptr = NodePtr::new(el);
484
485 tail.next = ptr;
486 *tail = ptr;
487
488 // Increment the count
489 let len = self.len.fetch_add(1, Ordering::Release);
490
491 if len + 1 < self.capacity {
492 self.not_full.notify_one();
493 }
494
495 drop(tail);
496
497 if len == 0 {
498 let _l = self.head
499 .lock()
500 .ok()
501 .expect("something went wrong");
502
503 self.not_empty.notify_one();
504 }
505 }
506
507 fn pop(&self) -> Result<T, RecvError> {
508 // Acquire the read lock
509 let mut head = self.head.lock().ok().expect("something went wrong");
510
511 while self.len.load(Ordering::Acquire) == 0 {
512 // Ensure that there are still senders
513 if !self.is_open.load(Ordering::Relaxed) {
514 return Err(RecvError);
515 }
516
517 head = self.not_empty.wait(head).ok().expect("something went wrong");
518 }
519
520 Ok(self.dequeue(head))
521 }
522
523 fn try_pop(&self) -> Result<T, TryRecvError> {
524 // Acquire the read lock
525 let head = self.head.lock().ok().expect("something went wrong");
526
527 if self.len.load(Ordering::Acquire) == 0 {
528 if !self.is_open.load(Ordering::Relaxed) {
529 return Err(TryRecvError::Disconnected);
530 } else {
531 return Err(TryRecvError::Empty);
532 }
533 }
534
535 Ok(self.dequeue(head))
536 }
537
538 fn pop_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> {
539 // Acquire the read lock
540 let mut head = self.head.lock().ok().expect("something went wrong");
541
542 if self.len.load(Ordering::Acquire) == 0 {
543 let mut now = Instant::now();
544 let deadline = now + dur;
545
546 loop {
547 if now >= deadline {
548 return Err(RecvTimeoutError::Timeout);
549 }
550
551 // Ensure that there are still senders
552 if !self.is_open.load(Ordering::Relaxed) {
553 return Err(RecvTimeoutError::Disconnected);
554 }
555
556 head = self.not_empty
557 .wait_timeout(head, deadline.duration_since(now))
558 .ok()
559 .expect("something went wrong")
560 .0;
561
562 if self.len.load(Ordering::Acquire) != 0 {
563 break;
564 }
565
566 now = Instant::now();
567 }
568 }
569
570 // At this point, we are guaranteed to be able to dequeue a value
571 Ok(self.dequeue(head))
572 }
573
574 fn dequeue(&self, mut head: MutexGuard<NodePtr<T>>) -> T {
575 let h = *head;
576 let mut first = h.next;
577
578 *head = first;
579
580 let val = first.item.take().expect("item already consumed");
581 let len = self.len.fetch_sub(1, Ordering::Release);
582
583 if len > 1 {
584 self.not_empty.notify_one();
585 }
586
587 // Release the lock here so that acquire the write lock does not result
588 // in a deadlock
589 drop(head);
590
591 // Free memory
592 h.free();
593
594 if len == self.capacity {
595 let _l = self.tail
596 .lock()
597 .ok()
598 .expect("something went wrong");
599
600 self.not_full.notify_one();
601 }
602
603 val
604 }
605
606 fn close(&self) {
607 if self.is_open.swap(false, Ordering::SeqCst) {
608 self.notify_tx();
609 self.notify_rx();
610 }
611 }
612
613 fn notify_tx(&self) {
614 let _lock = self.head.lock().expect("something went wrong");
615 self.not_empty.notify_all();
616 }
617
618 fn notify_rx(&self) {
619 let _lock = self.tail.lock().expect("something went wrong");
620 self.not_full.notify_all();
621 }
622}
623
624impl<T> Drop for Inner<T> {
625 fn drop(&mut self) {
626 while let Ok(_) = self.try_pop() {
627 }
628 }
629}
630
631struct Node<T> {
632 next: NodePtr<T>,
633 item: Option<T>,
634}
635
636impl<T> Node<T> {
637 fn new(val: T) -> Node<T> {
638 Node {
639 next: NodePtr::null(),
640 item: Some(val),
641 }
642 }
643
644 fn empty() -> Node<T> {
645 Node {
646 next: NodePtr::null(),
647 item: None,
648 }
649 }
650
651 fn into_inner(self) -> T {
652 self.item.unwrap()
653 }
654}
655
656struct NodePtr<T> {
657 ptr: *mut Node<T>,
658}
659
660impl<T> NodePtr<T> {
661 fn new(node: Box<Node<T>>) -> NodePtr<T> {
662 NodePtr { ptr: unsafe { mem::transmute(node) } }
663 }
664
665 fn null() -> NodePtr<T> {
666 NodePtr { ptr: ptr::null_mut() }
667 }
668
669 fn free(self) {
670 let NodePtr { ptr } = self;
671 let _: Box<Node<T>> = unsafe { mem::transmute(ptr) };
672 }
673}
674
675impl<T> ops::Deref for NodePtr<T> {
676 type Target = Node<T>;
677
678 fn deref(&self) -> &Node<T> {
679 unsafe { mem::transmute(self.ptr) }
680 }
681}
682
683impl<T> ops::DerefMut for NodePtr<T> {
684 fn deref_mut(&mut self) -> &mut Node<T> {
685 unsafe { mem::transmute(self.ptr) }
686 }
687}
688
689impl<T> Clone for NodePtr<T> {
690 fn clone(&self) -> NodePtr<T> {
691 NodePtr { ptr: self.ptr }
692 }
693}
694
695impl<T> Copy for NodePtr<T> {}
696unsafe impl<T> Send for NodePtr<T> where T: Send {}
697
698#[cfg(test)]
699mod test {
700 use super::*;
701 use std::thread;
702 use std::time::{Duration, Instant};
703
704 #[test]
705 fn single_thread_send_recv() {
706 let (tx, rx) = channel(1024);
707
708 // Check the length
709 assert_eq!(0, tx.len());
710 assert_eq!(0, rx.len());
711
712 // Send a value
713 tx.send("hello").unwrap();
714
715 // Check the length again
716 assert_eq!(1, tx.len());
717 assert_eq!(1, rx.len());
718
719 // Receive the value
720 assert_eq!("hello", rx.recv().unwrap());
721
722 // Check the length
723 assert_eq!(0, tx.len());
724 assert_eq!(0, rx.len());
725
726 // Try taking on an empty queue
727 assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
728 }
729
730 #[test]
731 fn single_thread_send_timeout() {
732 let (tx, _rx) = channel(1);
733
734 tx.try_send("hello").unwrap();
735
736 let now = Instant::now();
737 let dur = Duration::from_millis(200);
738
739 assert!(tx.send_timeout("world", dur).is_err());
740
741 let act = now.elapsed();
742
743 assert!(act >= dur);
744 assert!(act < dur * 2);
745 }
746
747 #[test]
748 fn single_thread_recv_timeout() {
749 let (_tx, rx) = channel::<u32>(1024);
750
751 let now = Instant::now();
752 let dur = Duration::from_millis(200);
753
754 assert!(rx.recv_timeout(dur).is_err());
755
756 let act = now.elapsed();
757
758 assert!(act >= dur);
759 assert!(act < dur * 2);
760 }
761
762 #[test]
763 fn single_consumer_single_producer() {
764 let (tx, rx) = channel(1024);
765
766 thread::spawn(move || {
767 thread::sleep(Duration::from_millis(10));
768
769 for i in 0..10_000 {
770 tx.send(i).unwrap();
771 }
772 });
773
774 for i in 0..10_000 {
775 assert_eq!(i, rx.recv().unwrap());
776 }
777
778 assert!(rx.recv().is_err());
779 }
780
781 #[test]
782 fn single_consumer_multi_producer() {
783 let (tx, rx) = channel(1024);
784
785 for t in 0..10 {
786 let tx = tx.clone();
787
788 thread::spawn(move || {
789 for i in 0..10_000 {
790 tx.send((t, i)).unwrap();
791 }
792 });
793 }
794
795 drop(tx);
796
797 let mut vals = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
798
799 for _ in 0..10 * 10_000 {
800 let (t, v) = rx.recv().unwrap();
801 assert_eq!(vals[t], v);
802 vals[t] += 1;
803 }
804
805 assert!(rx.recv().is_err());
806
807 for i in 0..10 {
808 assert_eq!(vals[i], 10_000);
809 }
810 }
811
812 #[test]
813 fn multi_consumer_multi_producer() {
814 let (tx, rx) = channel(1024);
815 let (res_tx, res_rx) = channel(1024);
816
817 const PER_PRODUCER: usize = 10_000;
818
819 // Producers
820 for t in 0..5 {
821 let tx = tx.clone();
822
823 thread::spawn(move || {
824 for i in 1..PER_PRODUCER {
825 tx.send((t, i)).unwrap();
826
827 if i % 10 == 0 {
828 thread::yield_now();
829 }
830 }
831 });
832 }
833
834 drop(tx);
835
836 // Consumers
837 for _ in 0..5 {
838 let rx = rx.clone();
839 let res_tx = res_tx.clone();
840
841 thread::spawn(move || {
842 let mut vals = vec![];
843 let mut per_producer = [0, 0, 0, 0, 0];
844
845 loop {
846 let (t, v) = match rx.recv() {
847 Ok(v) => v,
848 _ => break,
849 };
850
851 assert!(per_producer[t] < v);
852 per_producer[t] = v;
853
854 vals.push((t, v));
855
856 if v % 10 == 0 {
857 thread::yield_now();
858 }
859 }
860
861 res_tx.send(vals).unwrap();
862 });
863 }
864
865 drop(rx);
866 drop(res_tx);
867
868 let mut all_vals = vec![];
869
870 for _ in 0..5 {
871 let vals = res_rx.recv().unwrap();
872
873 for &v in vals.iter() {
874 all_vals.push(v);
875 }
876 }
877
878 all_vals.sort();
879
880 let mut per_producer = [1, 1, 1, 1, 1];
881
882 for &(t, v) in all_vals.iter() {
883 assert_eq!(per_producer[t], v);
884 per_producer[t] += 1;
885 }
886
887 for &v in per_producer.iter() {
888 assert_eq!(PER_PRODUCER, v);
889 }
890 }
891
892 #[test]
893 fn queue_with_capacity() {
894 let (tx, rx) = channel(8);
895
896 for i in 0..8 {
897 assert!(tx.try_send(i).is_ok());
898 }
899
900 assert_eq!(TrySendError::Full(8), tx.try_send(8).unwrap_err());
901 assert_eq!(0, rx.try_recv().unwrap());
902
903 assert!(tx.try_send(8).is_ok());
904
905 for i in 1..9 {
906 assert_eq!(i, rx.try_recv().unwrap());
907 }
908 }
909
910 #[test]
911 fn multi_producer_at_capacity() {
912 let (tx, rx) = channel(8);
913
914 for _ in 0..8 {
915 let tx = tx.clone();
916
917 thread::spawn(move || {
918 for i in 0..1_000 {
919 tx.send(i).unwrap();
920 }
921 });
922 }
923
924 drop(tx);
925
926 for _ in 0..8 * 1_000 {
927 rx.recv().unwrap();
928 }
929
930 rx.recv().unwrap_err();
931 }
932
933 #[test]
934 fn test_tx_shutdown() {
935 let (tx, rx) = channel(1024);
936
937 {
938 // Clone tx to keep a handle open
939 let tx = tx.clone();
940 thread::spawn(move || {
941 tx.send("hello").unwrap();
942 tx.close();
943 });
944 }
945
946 assert_eq!("hello", rx.recv().unwrap());
947 assert!(rx.recv().is_err());
948 assert!(tx.send("goodbye").is_err());
949 }
950
951 #[test]
952 fn test_rx_shutdown() {
953 let (tx, rx) = channel(1024);
954
955 {
956 let tx = tx.clone();
957 let rx = rx.clone();
958
959 thread::spawn(move || {
960 tx.send("hello").unwrap();
961 rx.close();
962 });
963 }
964
965 assert_eq!("hello", rx.recv().unwrap());
966 assert!(rx.recv().is_err());
967 assert!(tx.send("goodbye").is_err());
968 }
969}