futures_channel/mpsc/mod.rs
1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`](Receiver) and
5//! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements
6//! [`Stream`] and allows a task to read values out of the
7//! channel. If there is no message to read from the channel, the current task
8//! will be awoken when a new value is sent. [`Sender`](Sender) implements the
9//! `Sink` trait and allows a task to send messages into
10//! the channel. If the channel is at capacity, the send will be rejected and
11//! the task will be awoken when additional capacity is available. This process
12//! of delaying sends beyond a certain capacity is often referred to as
13//! "backpressure".
14//!
15//! Unbounded channels (without backpressure) are also available using
16//! the [`unbounded`](unbounded) function.
17//!
18//! # Disconnection
19//!
20//! When all [`Sender`](Sender)s have been dropped, it is no longer
21//! possible to send values into the channel. This is considered the termination
22//! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next)
23//! will return `Ok(Ready(None))`.
24//!
25//! If the [`Receiver`](Receiver) handle is dropped, then messages can no longer
26//! be read out of the channel. In this case, all further attempts to send will
27//! result in an error.
28//!
29//! # Clean Shutdown
30//!
31//! If the [`Receiver`](Receiver) is simply dropped, then it is possible for
32//! there to be messages still in the channel that will not be processed. As
33//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
34//! receiver will first call `close`, which will prevent any further messages to
35//! be sent into the channel. Then, the receiver consumes the channel to
36//! completion, at which point the receiver can be dropped.
37
38// At the core, the channel uses an atomic FIFO queue for message passing. This
39// queue is used as the primary coordination primitive. In order to enforce
40// capacity limits and handle back pressure, a secondary FIFO queue is used to
41// send wakers for blocked `Sender` tasks.
42//
43// The general idea is that the channel is created with a `buffer` size of `n`.
44// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
45// slot to hold a message. This allows `Sender` to know for a fact that a send
46// can be successfully started *before* beginning to do the actual work of
47// sending the value. However, a `send` will not complete until the number of
48// messages in the channel has dropped back down below the configured buffer
49// size.
50//
51// Note that the implementation guarantees that the number of items that have
52// finished sending into a channel without being received will not exceed the
53// configured buffer size. However, there is no *strict* guarantee that the
54// receiver will wake up a blocked `Sender` *immediately* when the buffer size
55// drops below the configured limit. However, it will almost always awaken a
56// `Sender` when buffer space becomes available, and it is *guaranteed* that a
57// `Sender` will be awoken by the time its most recently-sent message is
58// popped out of the channel by the `Receiver`.
59//
60// The steps for sending a message are roughly:
61//
62// 1) Increment the channel message count
63// 2) If the channel is at capacity, push the task's waker onto the wait queue
64// 3) Push the message onto the message queue
65// 4) If a wakeup was queued, wait for it to occur
66//
67// The steps for receiving a message are roughly:
68//
69// 1) Pop a message from the message queue
70// 2) Pop a task waker from the wait queue
71// 3) Decrement the channel message count
72//
73// It's important for the order of operations on lock-free structures to happen
74// in reverse order between the sender and receiver. This makes the message
75// queue the primary coordination structure and establishes the necessary
76// happens-before semantics required for the acquire / release semantics used
77// by the queue structure.
78
79use std::fmt;
80use std::error::Error;
81use std::any::Any;
82use std::sync::atomic::{AtomicBool, AtomicUsize};
83use std::sync::atomic::Ordering::SeqCst;
84use std::sync::{Arc, Mutex};
85use std::thread;
86use std::usize;
87
88use futures_core::task::{self, AtomicWaker, Waker};
89use futures_core::{Async, Poll, Stream};
90use futures_core::never::Never;
91
92use mpsc::queue::{Queue, PopResult};
93
94mod queue;
95
96/// The transmission end of a bounded mpsc channel.
97///
98/// This value is created by the [`channel`](channel) function.
99#[derive(Debug)]
100pub struct Sender<T> {
101 // Channel state shared between the sender and receiver.
102 inner: Arc<Inner<T>>,
103
104 // Handle to the task that is blocked on this sender. This handle is sent
105 // to the receiver half in order to be notified when the sender becomes
106 // unblocked.
107 sender_waker: Arc<Mutex<SenderWaker>>,
108
109 // True if the sender might be blocked. This is an optimization to avoid
110 // having to lock the mutex most of the time.
111 maybe_blocked: bool,
112}
113
114/// The receiving end of a bounded mpsc channel.
115///
116/// This value is created by the [`channel`](channel) function.
117#[derive(Debug)]
118pub struct Receiver<T> {
119 inner: Arc<Inner<T>>,
120}
121
122/// The error type for [`Sender`s](Sender) used as `Sink`s.
123#[derive(Clone, Debug, PartialEq, Eq)]
124pub struct SendError {
125 kind: SendErrorKind,
126}
127
128/// The error type returned from [`try_send`](Sender::try_send).
129#[derive(Clone, PartialEq, Eq)]
130pub struct TrySendError<T> {
131 err: SendError,
132 val: T,
133}
134
135#[derive(Clone, Debug, PartialEq, Eq)]
136enum SendErrorKind {
137 Full,
138 Disconnected,
139}
140
141/// The error type returned from [`try_next`](Receiver::try_next).
142pub struct TryRecvError {
143 _inner: (),
144}
145
146impl fmt::Display for SendError {
147 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
148 if self.is_full() {
149 write!(fmt, "send failed because channel is full")
150 } else {
151 write!(fmt, "send failed because receiver is gone")
152 }
153 }
154}
155
156impl Error for SendError {
157 fn description(&self) -> &str {
158 if self.is_full() {
159 "send failed because channel is full"
160 } else {
161 "send failed because receiver is gone"
162 }
163 }
164}
165
166impl SendError {
167 /// Returns true if this error is a result of the channel being full.
168 pub fn is_full(&self) -> bool {
169 match self.kind {
170 SendErrorKind::Full => true,
171 _ => false,
172 }
173 }
174
175 /// Returns true if this error is a result of the receiver being dropped.
176 pub fn is_disconnected(&self) -> bool {
177 match self.kind {
178 SendErrorKind::Disconnected => true,
179 _ => false,
180 }
181 }
182
183 fn disconnected() -> Self {
184 SendError { kind: SendErrorKind::Disconnected }
185 }
186}
187
188impl<T> fmt::Debug for TrySendError<T> {
189 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
190 fmt.debug_struct("TrySendError")
191 .field("kind", &self.err.kind)
192 .finish()
193 }
194}
195
196impl<T> fmt::Display for TrySendError<T> {
197 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
198 if self.is_full() {
199 write!(fmt, "send failed because channel is full")
200 } else {
201 write!(fmt, "send failed because receiver is gone")
202 }
203 }
204}
205
206impl<T: Any> Error for TrySendError<T> {
207 fn description(&self) -> &str {
208 if self.is_full() {
209 "send failed because channel is full"
210 } else {
211 "send failed because receiver is gone"
212 }
213 }
214}
215
216impl<T> TrySendError<T> {
217 /// Returns true if this error is a result of the channel being full.
218 pub fn is_full(&self) -> bool {
219 self.err.is_full()
220 }
221
222 /// Returns true if this error is a result of the receiver being dropped.
223 pub fn is_disconnected(&self) -> bool {
224 self.err.is_disconnected()
225 }
226
227 /// Returns the message that was attempted to be sent but failed.
228 pub fn into_inner(self) -> T {
229 self.val
230 }
231
232 /// Drops the message and converts into a `SendError`.
233 pub fn into_send_error(self) -> SendError {
234 self.err
235 }
236}
237
238impl fmt::Debug for TryRecvError {
239 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
240 fmt.debug_tuple("TryRecvError")
241 .finish()
242 }
243}
244
245impl fmt::Display for TryRecvError {
246 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
247 fmt.write_str(self.description())
248 }
249}
250
251impl Error for TryRecvError {
252 fn description(&self) -> &str {
253 "receiver channel is empty"
254 }
255}
256
257#[derive(Debug)]
258struct Inner<T> {
259 // Max buffer size of the channel. If `None` then the channel is unbounded.
260 buffer: Option<usize>,
261
262 // Internal channel state. Consists of the number of messages stored in the
263 // channel as well as a flag signalling that the channel is closed.
264 state: AtomicUsize,
265
266 // Atomic, FIFO queue used to send messages to the receiver
267 message_queue: Queue<Option<T>>,
268
269 // Atomic, FIFO queue used to send parked task handles to the receiver.
270 parked_queue: Queue<Arc<Mutex<SenderWaker>>>,
271
272 // Number of senders in existence
273 num_senders: AtomicUsize,
274
275 // Waker for the receiver's task.
276 recv_waker: AtomicWaker,
277}
278
279// Struct representation of `Inner::state`.
280#[derive(Debug, Clone, Copy)]
281struct State {
282 // `true` when the channel is open
283 is_open: bool,
284
285 // Number of messages in the channel
286 num_messages: usize,
287}
288
289// The `is_open` flag is stored in the left-most bit of `Inner::state`
290const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
291
292// When a new channel is created, it is created in the open state with no
293// pending messages.
294const INIT_STATE: usize = OPEN_MASK;
295
296// The maximum number of messages that a channel can track is `usize::MAX >> 1`
297const MAX_CAPACITY: usize = !(OPEN_MASK);
298
299// The maximum requested buffer size must be less than the maximum capacity of
300// a channel. This is because each sender gets a guaranteed slot.
301const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
302
303// Sent to the consumer to wake up blocked producers
304#[derive(Debug)]
305struct SenderWaker {
306 waker: Option<Waker>,
307 is_blocked: bool,
308}
309
310impl SenderWaker {
311 fn new() -> Self {
312 SenderWaker {
313 waker: None,
314 is_blocked: false,
315 }
316 }
317
318 fn wake(&mut self) {
319 self.is_blocked = false;
320
321 if let Some(waker) = self.waker.take() {
322 waker.wake();
323 }
324 }
325}
326
327/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
328///
329/// Being bounded, this channel provides backpressure to ensure that the sender
330/// outpaces the receiver by only a limited amount. The channel's capacity is
331/// equal to `buffer + num-senders`. In other words, each sender gets a
332/// guaranteed slot in the channel capacity, and on top of that there are
333/// `buffer` "first come, first serve" slots available to all senders.
334///
335/// The [`Receiver`](Receiver) returned implements the
336/// [`Stream`] trait, while [`Sender`](Sender) implements
337/// `Sink`.
338pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
339 // Check that the requested buffer size does not exceed the maximum buffer
340 // size permitted by the system.
341 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
342 channel2(Some(buffer))
343}
344
345fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
346 let inner = Arc::new(Inner {
347 buffer: buffer,
348 state: AtomicUsize::new(INIT_STATE),
349 message_queue: Queue::new(),
350 parked_queue: Queue::new(),
351 num_senders: AtomicUsize::new(1),
352 recv_waker: AtomicWaker::new(),
353 });
354
355 let tx = Sender {
356 inner: inner.clone(),
357 sender_waker: Arc::new(Mutex::new(SenderWaker::new())),
358 maybe_blocked: false,
359 };
360
361 let rx = Receiver {
362 inner: inner,
363 };
364
365 (tx, rx)
366}
367
368/*
369 *
370 * ===== impl Sender =====
371 *
372 */
373
374impl<T> Sender<T> {
375 /// Attempts to send a message on this `Sender`, returning the message
376 /// if there was an error.
377 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
378 // If the sender is currently blocked, reject the message
379 if !self.poll_unparked(None).is_ready() {
380 return Err(TrySendError {
381 err: SendError {
382 kind: SendErrorKind::Full,
383 },
384 val: msg,
385 });
386 }
387
388 // The channel has capacity to accept the message, so send it
389 self.do_send(None, msg)
390 }
391
392 /// Send a message on the channel.
393 ///
394 /// This function should only be called after
395 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
396 /// ready to receive a message.
397 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
398 self.try_send(msg)
399 .map_err(|e| e.err)
400 }
401
402 // Do the send without failing
403 // None means close
404 fn do_send(&mut self, cx: Option<&mut task::Context>, msg: T)
405 -> Result<(), TrySendError<T>>
406 {
407 // Anyone callig do_send *should* make sure there is room first,
408 // but assert here for tests as a sanity check.
409 debug_assert!(self.poll_unparked(None).is_ready());
410
411 // First, increment the number of messages contained by the channel.
412 // This operation will also atomically determine if the sender task
413 // should be parked.
414 //
415 // None is returned in the case that the channel has been closed by the
416 // receiver. This happens when `Receiver::close` is called or the
417 // receiver is dropped.
418 let park_self = match self.inc_num_messages(false) {
419 Some(park_self) => park_self,
420 None => return Err(TrySendError {
421 err: SendError {
422 kind: SendErrorKind::Disconnected,
423 },
424 val: msg,
425 }),
426 };
427
428 // If the channel has reached capacity, then the sender task needs to
429 // be parked. This will send the task handle on the parked task queue.
430 //
431 // However, when `do_send` is called while dropping the `Sender`,
432 // `task::current()` can't be called safely. In this case, in order to
433 // maintain internal consistency, a blank message is pushed onto the
434 // parked task queue.
435 if park_self {
436 self.park(cx);
437 }
438
439 self.push_msg_and_wake_receiver(Some(msg));
440
441 Ok(())
442 }
443
444 // Do the send without parking current task.
445 fn do_send_nb(&self, msg: Option<T>) -> Result<(), TrySendError<T>> {
446 match self.inc_num_messages(msg.is_none()) {
447 Some(park_self) => assert!(!park_self),
448 None => {
449 // The receiver has closed the channel. Only abort if actually
450 // sending a message. It is important that the stream
451 // termination (None) is always sent. This technically means
452 // that it is possible for the queue to contain the following
453 // number of messages:
454 //
455 // num-senders + buffer + 1
456 //
457 if let Some(msg) = msg {
458 return Err(TrySendError {
459 err: SendError {
460 kind: SendErrorKind::Disconnected,
461 },
462 val: msg,
463 });
464 } else {
465 return Ok(());
466 }
467 },
468 };
469
470 self.push_msg_and_wake_receiver(msg);
471
472 Ok(())
473 }
474
475 // Push message to the queue and signal to the receiver
476 fn push_msg_and_wake_receiver(&self, msg: Option<T>) {
477 // Push the message onto the message queue
478 self.inner.message_queue.push(msg);
479
480 // Awaken the reciever task if it was blocked.
481 self.inner.recv_waker.wake();
482 }
483
484 // Increment the number of queued messages. Returns if the sender should
485 // block.
486 fn inc_num_messages(&self, close: bool) -> Option<bool> {
487 let mut curr = self.inner.state.load(SeqCst);
488
489 loop {
490 let mut state = decode_state(curr);
491
492 // The receiver end closed the channel.
493 if !state.is_open {
494 return None;
495 }
496
497 // This probably is never hit? Odds are the process will run out of
498 // memory first. It may be worth to return something else in this
499 // case?
500 assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
501 sending this messages would overflow the state");
502
503 state.num_messages += 1;
504
505 // The channel is closed by all sender handles being dropped.
506 if close {
507 state.is_open = false;
508 }
509
510 let next = encode_state(&state);
511 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
512 Ok(_) => {
513 // Block if the current number of pending messages has exceeded
514 // the configured buffer size
515 let park_self = !close && match self.inner.buffer {
516 Some(buffer) => state.num_messages > buffer,
517 None => false,
518 };
519
520 return Some(park_self)
521 }
522 Err(actual) => curr = actual,
523 }
524 }
525 }
526
527 fn park(&mut self, cx: Option<&mut task::Context>) {
528 // TODO: clean up internal state if the task::current will fail
529
530 let waker = cx.map(|cx| cx.waker().clone());
531
532 {
533 let mut sender = self.sender_waker.lock().unwrap();
534 sender.waker = waker;
535 sender.is_blocked = true;
536 }
537
538 // Send handle over queue
539 let t = self.sender_waker.clone();
540 self.inner.parked_queue.push(t);
541
542 // Check to make sure we weren't closed after we sent our task on the
543 // queue
544 self.maybe_blocked = !self.is_closed();
545 }
546
547 /// Polls the channel to determine if there is guaranteed capacity to send
548 /// at least one item without waiting.
549 ///
550 /// # Return value
551 ///
552 /// This method returns:
553 ///
554 /// - `Ok(Async::Ready(_))` if there is sufficient capacity;
555 /// - `Ok(Async::Pending)` if the channel may not have
556 /// capacity, in which case the current task is queued to be notified once capacity is available;
557 /// - `Err(SendError)` if the receiver has been dropped.
558 pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError> {
559 let state = decode_state(self.inner.state.load(SeqCst));
560 if !state.is_open {
561 return Err(SendError {
562 kind: SendErrorKind::Disconnected,
563 });
564 }
565
566 Ok(self.poll_unparked(Some(cx)))
567 }
568
569 /// Returns whether this channel is closed without needing a context.
570 pub fn is_closed(&self) -> bool {
571 !decode_state(self.inner.state.load(SeqCst)).is_open
572 }
573
574 /// Closes this channel from the sender side, preventing any new messages.
575 pub fn close_channel(&mut self) {
576 // There's no need to park this sender, its dropping,
577 // and we don't want to check for capacity, so skip
578 // that stuff from `do_send`.
579
580 let _ = self.do_send_nb(None);
581 }
582
583 fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> {
584 // First check the `maybe_blocked` variable. This avoids acquiring the
585 // lock in most cases
586 if self.maybe_blocked {
587 // Get a lock on the task handle
588 let mut sender_waker = self.sender_waker.lock().unwrap();
589
590 if !sender_waker.is_blocked {
591 self.maybe_blocked = false;
592 return Async::Ready(())
593 }
594
595 // At this point, an wake request is pending, so there will be an
596 // wake sometime in the future. We just need to make sure that
597 // the correct task will be notified.
598 //
599 // Update the waker in case the `Sender` has been moved to another
600 // task
601 sender_waker.waker = cx.map(|cx| cx.waker().clone());
602
603 Async::Pending
604 } else {
605 Async::Ready(())
606 }
607 }
608}
609
610impl<T> Clone for Sender<T> {
611 fn clone(&self) -> Sender<T> {
612 // Since this atomic op isn't actually guarding any memory and we don't
613 // care about any orderings besides the ordering on the single atomic
614 // variable, a relaxed ordering is acceptable.
615 let mut curr = self.inner.num_senders.load(SeqCst);
616
617 loop {
618 // If the maximum number of senders has been reached, then fail
619 if curr == self.inner.max_senders() {
620 panic!("cannot clone `Sender` -- too many outstanding senders");
621 }
622
623 debug_assert!(curr < self.inner.max_senders());
624
625 let next = curr + 1;
626 let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
627
628 // The ABA problem doesn't matter here. We only care that the
629 // number of senders never exceeds the maximum.
630 if actual == curr {
631 return Sender {
632 inner: self.inner.clone(),
633 sender_waker: Arc::new(Mutex::new(SenderWaker::new())),
634 maybe_blocked: false,
635 };
636 }
637
638 curr = actual;
639 }
640 }
641}
642
643impl<T> Drop for Sender<T> {
644 fn drop(&mut self) {
645 // Ordering between variables don't matter here
646 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
647
648 if prev == 1 {
649 // There's no need to park this sender, its dropping,
650 // and we don't want to check for capacity, so skip
651 // that stuff from `do_send`.
652 let _ = self.do_send_nb(None);
653 }
654 }
655}
656
657/*
658 *
659 * ===== impl Receiver =====
660 *
661 */
662
663impl<T> Receiver<T> {
664 /// Closes the receiving half of a channel, without dropping it.
665 ///
666 /// This prevents any further messages from being sent on the channel while
667 /// still enabling the receiver to drain messages that are buffered.
668 pub fn close(&mut self) {
669 let mut curr = self.inner.state.load(SeqCst);
670
671 loop {
672 let mut state = decode_state(curr);
673
674 if !state.is_open {
675 break
676 }
677
678 state.is_open = false;
679
680 let next = encode_state(&state);
681 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
682 Ok(_) => break,
683 Err(actual) => curr = actual,
684 }
685 }
686
687 // Wake up any threads waiting as they'll see that we've closed the
688 // channel and will continue on their merry way.
689 loop {
690 match unsafe { self.inner.parked_queue.pop() } {
691 PopResult::Data(task) => {
692 task.lock().unwrap().wake();
693 }
694 PopResult::Empty => break,
695 PopResult::Inconsistent => thread::yield_now(),
696 }
697 }
698 }
699
700 /// Tries to receive the next message without wakeing a context if empty.
701 ///
702 /// It is not recommended to call this function from inside of a future,
703 /// only when you've otherwise arranged to be notified when the channel is
704 /// no longer empty.
705 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
706 match self.next_message() {
707 Async::Ready(msg) => {
708 Ok(msg)
709 },
710 Async::Pending => Err(TryRecvError { _inner: () }),
711 }
712 }
713
714 fn next_message(&mut self) -> Async<Option<T>> {
715 // Pop off a message
716 loop {
717 match unsafe { self.inner.message_queue.pop() } {
718 PopResult::Data(msg) => {
719 // If there are any parked task handles in the parked queue, pop
720 // one and unpark it.
721 self.wake_one();
722
723 // Decrement number of messages
724 self.dec_num_messages();
725
726 return Async::Ready(msg);
727 }
728 PopResult::Empty => {
729 // The queue is empty, return Pending
730 return Async::Pending;
731 }
732 PopResult::Inconsistent => {
733 // Inconsistent means that there will be a message to pop
734 // in a short time. This branch can only be reached if
735 // values are being produced from another thread, so there
736 // are a few ways that we can deal with this:
737 //
738 // 1) Spin
739 // 2) thread::yield_now()
740 // 3) task::current().unwrap() & return Pending
741 //
742 // For now, thread::yield_now() is used, but it would
743 // probably be better to spin a few times then yield.
744 thread::yield_now();
745 }
746 }
747 }
748 }
749
750 // Unpark a single task handle if there is one pending in the parked queue
751 fn wake_one(&mut self) {
752 loop {
753 match unsafe { self.inner.parked_queue.pop() } {
754 PopResult::Data(task) => {
755 task.lock().unwrap().wake();
756 return;
757 }
758 PopResult::Empty => {
759 // Queue empty, no task to wake up.
760 return;
761 }
762 PopResult::Inconsistent => {
763 // Same as above
764 thread::yield_now();
765 }
766 }
767 }
768 }
769
770 fn dec_num_messages(&self) {
771 let mut curr = self.inner.state.load(SeqCst);
772
773 loop {
774 let mut state = decode_state(curr);
775
776 state.num_messages -= 1;
777
778 let next = encode_state(&state);
779 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
780 Ok(_) => break,
781 Err(actual) => curr = actual,
782 }
783 }
784 }
785
786 fn poll_next_no_register(&mut self) -> Async<Option<T>> {
787 // Try to read a message off of the message queue.
788 if let Async::Ready(msg) = self.next_message() {
789 return Async::Ready(msg);
790 }
791
792 // Check if the channel is closed.
793 let state = decode_state(self.inner.state.load(SeqCst));
794 if !state.is_open && state.num_messages == 0 {
795 return Async::Ready(None);
796 }
797
798 Async::Pending
799 }
800}
801
802impl<T> Stream for Receiver<T> {
803 type Item = T;
804 type Error = Never;
805
806 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
807 if let Async::Ready(x) = self.poll_next_no_register() {
808 return Ok(Async::Ready(x));
809 }
810
811 // Register to receive a wakeup when more messages are sent.
812 self.inner.recv_waker.register(cx.waker());
813
814 // Check again for messages just in case one arrived in
815 // between the call to `next_message` and `register` above.
816 Ok(self.poll_next_no_register())
817
818 // The channel is not empty, not closed, and
819 // we're set to receive a wakeup when a message is sent.
820 }
821}
822
823impl<T> Drop for Receiver<T> {
824 fn drop(&mut self) {
825 // Drain the channel of all pending messages
826 self.close();
827 while self.next_message().is_ready() {
828 // ...
829 }
830 }
831}
832
833/*
834 *
835 * ===== impl Inner =====
836 *
837 */
838
839impl<T> Inner<T> {
840 // The return value is such that the total number of messages that can be
841 // enqueued into the channel will never exceed MAX_CAPACITY
842 fn max_senders(&self) -> usize {
843 match self.buffer {
844 Some(buffer) => MAX_CAPACITY - buffer,
845 None => MAX_BUFFER,
846 }
847 }
848}
849
850unsafe impl<T: Send> Send for Inner<T> {}
851unsafe impl<T: Send> Sync for Inner<T> {}
852
853/*
854 *
855 * ===== Helpers =====
856 *
857 */
858
859fn decode_state(num: usize) -> State {
860 State {
861 is_open: num & OPEN_MASK == OPEN_MASK,
862 num_messages: num & MAX_CAPACITY,
863 }
864}
865
866fn encode_state(state: &State) -> usize {
867 let mut num = state.num_messages;
868
869 if state.is_open {
870 num |= OPEN_MASK;
871 }
872
873 num
874}
875
876/*
877 *
878 * ==== Unbounded channels ====
879 *
880 */
881
882/// Creates an unbounded mpsc channel for communicating between asynchronous tasks.
883///
884/// A `send` on this channel will always succeed as long as the receive half has
885/// not been closed. If the receiver falls behind, messages will be arbitrarily
886/// buffered.
887///
888/// **Note** that the amount of available system memory is an implicit bound to
889/// the channel. Using an `unbounded` channel has the ability of causing the
890/// process to run out of memory. In this case, the process will be aborted.
891pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
892 let tx = Arc::new(UnboundedInner {
893 closed: AtomicBool::new(false),
894 message_queue: Queue::new(),
895 recv_waker: AtomicWaker::new(),
896 });
897 let rx = tx.clone();
898 (UnboundedSender(tx), UnboundedReceiver(rx))
899}
900
901/// The transmission end of an unbounded mpsc channel.
902///
903/// This value is created by the [`unbounded`](unbounded) function.
904#[derive(Debug, Clone)]
905pub struct UnboundedSender<T>(Arc<UnboundedInner<T>>);
906
907/// The receiving end of an unbounded mpsc channel.
908///
909/// This value is created by the [`unbounded`](unbounded) function.
910#[derive(Debug)]
911pub struct UnboundedReceiver<T>(Arc<UnboundedInner<T>>);
912
913trait AssertKinds: Send + Sync + Clone {}
914impl AssertKinds for UnboundedSender<u32> {}
915
916#[derive(Debug)]
917struct UnboundedInner<T> {
918 closed: AtomicBool,
919 message_queue: Queue<T>,
920 recv_waker: AtomicWaker,
921}
922
923impl<T> UnboundedSender<T> {
924 /// Check if the channel is ready to receive a message.
925 pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> {
926 Ok(Async::Ready(()))
927 }
928
929 /// Returns whether this channel is closed without needing a context.
930 pub fn is_closed(&self) -> bool {
931 self.0.closed.load(SeqCst)
932 }
933
934 /// Closes this channel from the sender side, preventing any new messages.
935 pub fn close_channel(&self) {
936 self.0.closed.store(true, SeqCst);
937 self.0.recv_waker.wake();
938 }
939
940 /// Send a message on the channel.
941 ///
942 /// This method should only be called after `poll_ready` has been used to
943 /// verify that the channel is ready to receive a message.
944 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
945 if self.0.closed.load(SeqCst) {
946 return Err(SendError::disconnected());
947 }
948 self.0.message_queue.push(msg);
949 self.0.recv_waker.wake();
950 Ok(())
951 }
952
953 /// Sends a message along this channel.
954 ///
955 /// This is an unbounded sender, so this function differs from `Sink::send`
956 /// by ensuring the return type reflects that the channel is always ready to
957 /// receive messages.
958 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
959 // TODO there's a race between checking the `closed` atomicbool
960 // and pushing onto the queue.
961 if self.0.closed.load(SeqCst) {
962 return Err(TrySendError {
963 err: SendError::disconnected(),
964 val: msg,
965 });
966 }
967 self.0.message_queue.push(msg);
968 self.0.recv_waker.wake();
969 Ok(())
970 }
971}
972
973impl<T> Drop for UnboundedSender<T> {
974 fn drop(&mut self) {
975 if Arc::strong_count(&self.0) == 2 {
976 // If it's just us and the reciever, or us and another sender,
977 // the channel should be closed.
978 self.0.closed.store(true, SeqCst);
979 self.0.recv_waker.wake();
980 }
981 }
982}
983
984impl<T> UnboundedReceiver<T> {
985 /// Closes the receiving half of the channel, without dropping it.
986 ///
987 /// This prevents any further messages from being sent on the channel while
988 /// still enabling the receiver to drain messages that are buffered.
989 pub fn close(&mut self) {
990 self.0.closed.store(true, SeqCst);
991 }
992
993 /// Tries to receive the next message without notifying a context if empty.
994 ///
995 /// It is not recommended to call this function from inside of a future,
996 /// only when you've otherwise arranged to be notified when the channel is
997 /// no longer empty.
998 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
999 loop {
1000 // Safe because this is the only place the message queue is popped,
1001 // and it takes `&mut self` to ensure that only the unique reciever
1002 // can pop off of the message queue.
1003 match unsafe { self.0.message_queue.pop() } {
1004 PopResult::Data(msg) => return Ok(Some(msg)),
1005 PopResult::Empty => {
1006 if self.0.closed.load(SeqCst) {
1007 // Ensure that the `closed` state wasn't written after
1008 // a final message was sent.
1009 match unsafe { self.0.message_queue.pop() } {
1010 PopResult::Data(msg) => return Ok(Some(msg)),
1011 PopResult::Empty => return Ok(None),
1012 PopResult::Inconsistent => {
1013 thread::yield_now();
1014 continue;
1015 }
1016 }
1017 }
1018 return Err(TryRecvError { _inner: () });
1019 }
1020 PopResult::Inconsistent => {
1021 // Inconsistent means that there will be a message to pop
1022 // in a short time. This branch can only be reached if
1023 // values are being produced from another thread, so there
1024 // are a few ways that we can deal with this:
1025 //
1026 // 1) Spin
1027 // 2) thread::yield_now()
1028 // 3) task::current().unwrap() & return Pending
1029 //
1030 // For now, thread::yield_now() is used, but it would
1031 // probably be better to spin a few times then yield.
1032 thread::yield_now();
1033 }
1034 }
1035 }
1036 }
1037}
1038
1039impl<T> Stream for UnboundedReceiver<T> {
1040 type Item = T;
1041 type Error = Never;
1042
1043 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
1044 if let Ok(msg) = self.try_next() {
1045 return Ok(Async::Ready(msg));
1046 }
1047 self.0.recv_waker.register(cx.waker());
1048 if let Ok(msg) = self.try_next() {
1049 return Ok(Async::Ready(msg));
1050 }
1051 Ok(Async::Pending)
1052 }
1053}
1054
1055impl<T> Drop for UnboundedReceiver<T> {
1056 fn drop(&mut self) {
1057 self.0.closed.store(true, SeqCst);
1058 }
1059}
1060