futures_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`](Receiver) and
5//! [`Sender`](Sender) handles. [`Receiver`](Receiver) implements
6//! [`Stream`] and allows a task to read values out of the
7//! channel. If there is no message to read from the channel, the current task
8//! will be awoken when a new value is sent. [`Sender`](Sender) implements the
9//! `Sink` trait and allows a task to send messages into
10//! the channel. If the channel is at capacity, the send will be rejected and
11//! the task will be awoken when additional capacity is available. This process
12//! of delaying sends beyond a certain capacity is often referred to as
13//! "backpressure".
14//!
15//! Unbounded channels (without backpressure) are also available using
16//! the [`unbounded`](unbounded) function.
17//!
18//! # Disconnection
19//!
20//! When all [`Sender`](Sender)s have been dropped, it is no longer
21//! possible to send values into the channel. This is considered the termination
22//! event of the stream. As such, [`Receiver::poll_next`](Receiver::poll_next)
23//! will return `Ok(Ready(None))`.
24//!
25//! If the [`Receiver`](Receiver) handle is dropped, then messages can no longer
26//! be read out of the channel. In this case, all further attempts to send will
27//! result in an error.
28//!
29//! # Clean Shutdown
30//!
31//! If the [`Receiver`](Receiver) is simply dropped, then it is possible for
32//! there to be messages still in the channel that will not be processed. As
33//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
34//! receiver will first call `close`, which will prevent any further messages to
35//! be sent into the channel. Then, the receiver consumes the channel to
36//! completion, at which point the receiver can be dropped.
37
38// At the core, the channel uses an atomic FIFO queue for message passing. This
39// queue is used as the primary coordination primitive. In order to enforce
40// capacity limits and handle back pressure, a secondary FIFO queue is used to
41// send wakers for blocked `Sender` tasks.
42//
43// The general idea is that the channel is created with a `buffer` size of `n`.
44// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
45// slot to hold a message. This allows `Sender` to know for a fact that a send
46// can be successfully started *before* beginning to do the actual work of
47// sending the value. However, a `send` will not complete until the number of
48// messages in the channel has dropped back down below the configured buffer
49// size.
50//
51// Note that the implementation guarantees that the number of items that have
52// finished sending into a channel without being received will not exceed the
53// configured buffer size. However, there is no *strict* guarantee that the
54// receiver will wake up a blocked `Sender` *immediately* when the buffer size
55// drops below the configured limit. However, it will almost always awaken a
56// `Sender` when buffer space becomes available, and it is *guaranteed* that a
57// `Sender` will be awoken by the time its most recently-sent message is
58// popped out of the channel by the `Receiver`.
59//
60// The steps for sending a message are roughly:
61//
62// 1) Increment the channel message count
63// 2) If the channel is at capacity, push the task's waker onto the wait queue
64// 3) Push the message onto the message queue
65// 4) If a wakeup was queued, wait for it to occur
66//
67// The steps for receiving a message are roughly:
68//
69// 1) Pop a message from the message queue
70// 2) Pop a task waker from the wait queue
71// 3) Decrement the channel message count
72//
73// It's important for the order of operations on lock-free structures to happen
74// in reverse order between the sender and receiver. This makes the message
75// queue the primary coordination structure and establishes the necessary
76// happens-before semantics required for the acquire / release semantics used
77// by the queue structure.
78
79use std::fmt;
80use std::error::Error;
81use std::any::Any;
82use std::sync::atomic::{AtomicBool, AtomicUsize};
83use std::sync::atomic::Ordering::SeqCst;
84use std::sync::{Arc, Mutex};
85use std::thread;
86use std::usize;
87
88use futures_core::task::{self, AtomicWaker, Waker};
89use futures_core::{Async, Poll, Stream};
90use futures_core::never::Never;
91
92use mpsc::queue::{Queue, PopResult};
93
94mod queue;
95
96/// The transmission end of a bounded mpsc channel.
97///
98/// This value is created by the [`channel`](channel) function.
99#[derive(Debug)]
100pub struct Sender<T> {
101    // Channel state shared between the sender and receiver.
102    inner: Arc<Inner<T>>,
103
104    // Handle to the task that is blocked on this sender. This handle is sent
105    // to the receiver half in order to be notified when the sender becomes
106    // unblocked.
107    sender_waker: Arc<Mutex<SenderWaker>>,
108
109    // True if the sender might be blocked. This is an optimization to avoid
110    // having to lock the mutex most of the time.
111    maybe_blocked: bool,
112}
113
114/// The receiving end of a bounded mpsc channel.
115///
116/// This value is created by the [`channel`](channel) function.
117#[derive(Debug)]
118pub struct Receiver<T> {
119    inner: Arc<Inner<T>>,
120}
121
122/// The error type for [`Sender`s](Sender) used as `Sink`s.
123#[derive(Clone, Debug, PartialEq, Eq)]
124pub struct SendError {
125    kind: SendErrorKind,
126}
127
128/// The error type returned from [`try_send`](Sender::try_send).
129#[derive(Clone, PartialEq, Eq)]
130pub struct TrySendError<T> {
131    err: SendError,
132    val: T,
133}
134
135#[derive(Clone, Debug, PartialEq, Eq)]
136enum SendErrorKind {
137    Full,
138    Disconnected,
139}
140
141/// The error type returned from [`try_next`](Receiver::try_next).
142pub struct TryRecvError {
143    _inner: (),
144}
145
146impl fmt::Display for SendError {
147    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
148        if self.is_full() {
149            write!(fmt, "send failed because channel is full")
150        } else {
151            write!(fmt, "send failed because receiver is gone")
152        }
153    }
154}
155
156impl Error for SendError {
157    fn description(&self) -> &str {
158        if self.is_full() {
159            "send failed because channel is full"
160        } else {
161            "send failed because receiver is gone"
162        }
163    }
164}
165
166impl SendError {
167    /// Returns true if this error is a result of the channel being full.
168    pub fn is_full(&self) -> bool {
169        match self.kind {
170            SendErrorKind::Full => true,
171            _ => false,
172        }
173    }
174
175    /// Returns true if this error is a result of the receiver being dropped.
176    pub fn is_disconnected(&self) -> bool {
177        match self.kind {
178            SendErrorKind::Disconnected => true,
179            _ => false,
180        }
181    }
182
183    fn disconnected() -> Self {
184        SendError { kind: SendErrorKind::Disconnected }
185    }
186}
187
188impl<T> fmt::Debug for TrySendError<T> {
189    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
190        fmt.debug_struct("TrySendError")
191            .field("kind", &self.err.kind)
192            .finish()
193    }
194}
195
196impl<T> fmt::Display for TrySendError<T> {
197    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
198        if self.is_full() {
199            write!(fmt, "send failed because channel is full")
200        } else {
201            write!(fmt, "send failed because receiver is gone")
202        }
203    }
204}
205
206impl<T: Any> Error for TrySendError<T> {
207    fn description(&self) -> &str {
208        if self.is_full() {
209            "send failed because channel is full"
210        } else {
211            "send failed because receiver is gone"
212        }
213    }
214}
215
216impl<T> TrySendError<T> {
217    /// Returns true if this error is a result of the channel being full.
218    pub fn is_full(&self) -> bool {
219        self.err.is_full()
220    }
221
222    /// Returns true if this error is a result of the receiver being dropped.
223    pub fn is_disconnected(&self) -> bool {
224        self.err.is_disconnected()
225    }
226
227    /// Returns the message that was attempted to be sent but failed.
228    pub fn into_inner(self) -> T {
229        self.val
230    }
231
232    /// Drops the message and converts into a `SendError`.
233    pub fn into_send_error(self) -> SendError {
234        self.err
235    }
236}
237
238impl fmt::Debug for TryRecvError {
239    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
240        fmt.debug_tuple("TryRecvError")
241            .finish()
242    }
243}
244
245impl fmt::Display for TryRecvError {
246    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
247        fmt.write_str(self.description())
248    }
249}
250
251impl Error for TryRecvError {
252    fn description(&self) -> &str {
253        "receiver channel is empty"
254    }
255}
256
257#[derive(Debug)]
258struct Inner<T> {
259    // Max buffer size of the channel. If `None` then the channel is unbounded.
260    buffer: Option<usize>,
261
262    // Internal channel state. Consists of the number of messages stored in the
263    // channel as well as a flag signalling that the channel is closed.
264    state: AtomicUsize,
265
266    // Atomic, FIFO queue used to send messages to the receiver
267    message_queue: Queue<Option<T>>,
268
269    // Atomic, FIFO queue used to send parked task handles to the receiver.
270    parked_queue: Queue<Arc<Mutex<SenderWaker>>>,
271
272    // Number of senders in existence
273    num_senders: AtomicUsize,
274
275    // Waker for the receiver's task.
276    recv_waker: AtomicWaker,
277}
278
279// Struct representation of `Inner::state`.
280#[derive(Debug, Clone, Copy)]
281struct State {
282    // `true` when the channel is open
283    is_open: bool,
284
285    // Number of messages in the channel
286    num_messages: usize,
287}
288
289// The `is_open` flag is stored in the left-most bit of `Inner::state`
290const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
291
292// When a new channel is created, it is created in the open state with no
293// pending messages.
294const INIT_STATE: usize = OPEN_MASK;
295
296// The maximum number of messages that a channel can track is `usize::MAX >> 1`
297const MAX_CAPACITY: usize = !(OPEN_MASK);
298
299// The maximum requested buffer size must be less than the maximum capacity of
300// a channel. This is because each sender gets a guaranteed slot.
301const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
302
303// Sent to the consumer to wake up blocked producers
304#[derive(Debug)]
305struct SenderWaker {
306    waker: Option<Waker>,
307    is_blocked: bool,
308}
309
310impl SenderWaker {
311    fn new() -> Self {
312        SenderWaker {
313            waker: None,
314            is_blocked: false,
315        }
316    }
317
318    fn wake(&mut self) {
319        self.is_blocked = false;
320
321        if let Some(waker) = self.waker.take() {
322            waker.wake();
323        }
324    }
325}
326
327/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
328///
329/// Being bounded, this channel provides backpressure to ensure that the sender
330/// outpaces the receiver by only a limited amount. The channel's capacity is
331/// equal to `buffer + num-senders`. In other words, each sender gets a
332/// guaranteed slot in the channel capacity, and on top of that there are
333/// `buffer` "first come, first serve" slots available to all senders.
334///
335/// The [`Receiver`](Receiver) returned implements the
336/// [`Stream`] trait, while [`Sender`](Sender) implements
337/// `Sink`.
338pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
339    // Check that the requested buffer size does not exceed the maximum buffer
340    // size permitted by the system.
341    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
342    channel2(Some(buffer))
343}
344
345fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
346    let inner = Arc::new(Inner {
347        buffer: buffer,
348        state: AtomicUsize::new(INIT_STATE),
349        message_queue: Queue::new(),
350        parked_queue: Queue::new(),
351        num_senders: AtomicUsize::new(1),
352        recv_waker: AtomicWaker::new(),
353    });
354
355    let tx = Sender {
356        inner: inner.clone(),
357        sender_waker: Arc::new(Mutex::new(SenderWaker::new())),
358        maybe_blocked: false,
359    };
360
361    let rx = Receiver {
362        inner: inner,
363    };
364
365    (tx, rx)
366}
367
368/*
369 *
370 * ===== impl Sender =====
371 *
372 */
373
374impl<T> Sender<T> {
375    /// Attempts to send a message on this `Sender`, returning the message
376    /// if there was an error.
377    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
378        // If the sender is currently blocked, reject the message
379        if !self.poll_unparked(None).is_ready() {
380            return Err(TrySendError {
381                err: SendError {
382                    kind: SendErrorKind::Full,
383                },
384                val: msg,
385            });
386        }
387
388        // The channel has capacity to accept the message, so send it
389        self.do_send(None, msg)
390    }
391
392    /// Send a message on the channel.
393    ///
394    /// This function should only be called after
395    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
396    /// ready to receive a message.
397    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
398        self.try_send(msg)
399            .map_err(|e| e.err)
400    }
401
402    // Do the send without failing
403    // None means close
404    fn do_send(&mut self, cx: Option<&mut task::Context>, msg: T)
405        -> Result<(), TrySendError<T>>
406    {
407        // Anyone callig do_send *should* make sure there is room first,
408        // but assert here for tests as a sanity check.
409        debug_assert!(self.poll_unparked(None).is_ready());
410
411        // First, increment the number of messages contained by the channel.
412        // This operation will also atomically determine if the sender task
413        // should be parked.
414        //
415        // None is returned in the case that the channel has been closed by the
416        // receiver. This happens when `Receiver::close` is called or the
417        // receiver is dropped.
418        let park_self = match self.inc_num_messages(false) {
419            Some(park_self) => park_self,
420            None => return Err(TrySendError {
421                err: SendError {
422                    kind: SendErrorKind::Disconnected,
423                },
424                val: msg,
425            }),
426        };
427
428        // If the channel has reached capacity, then the sender task needs to
429        // be parked. This will send the task handle on the parked task queue.
430        //
431        // However, when `do_send` is called while dropping the `Sender`,
432        // `task::current()` can't be called safely. In this case, in order to
433        // maintain internal consistency, a blank message is pushed onto the
434        // parked task queue.
435        if park_self {
436            self.park(cx);
437        }
438
439        self.push_msg_and_wake_receiver(Some(msg));
440
441        Ok(())
442    }
443
444    // Do the send without parking current task.
445    fn do_send_nb(&self, msg: Option<T>) -> Result<(), TrySendError<T>> {
446        match self.inc_num_messages(msg.is_none()) {
447            Some(park_self) => assert!(!park_self),
448            None => {
449                // The receiver has closed the channel. Only abort if actually
450                // sending a message. It is important that the stream
451                // termination (None) is always sent. This technically means
452                // that it is possible for the queue to contain the following
453                // number of messages:
454                //
455                //     num-senders + buffer + 1
456                //
457                if let Some(msg) = msg {
458                    return Err(TrySendError {
459                        err: SendError {
460                            kind: SendErrorKind::Disconnected,
461                        },
462                        val: msg,
463                    });
464                } else {
465                    return Ok(());
466                }
467            },
468        };
469
470        self.push_msg_and_wake_receiver(msg);
471
472        Ok(())
473    }
474
475    // Push message to the queue and signal to the receiver
476    fn push_msg_and_wake_receiver(&self, msg: Option<T>) {
477        // Push the message onto the message queue
478        self.inner.message_queue.push(msg);
479
480        // Awaken the reciever task if it was blocked.
481        self.inner.recv_waker.wake();
482    }
483
484    // Increment the number of queued messages. Returns if the sender should
485    // block.
486    fn inc_num_messages(&self, close: bool) -> Option<bool> {
487        let mut curr = self.inner.state.load(SeqCst);
488
489        loop {
490            let mut state = decode_state(curr);
491
492            // The receiver end closed the channel.
493            if !state.is_open {
494                return None;
495            }
496
497            // This probably is never hit? Odds are the process will run out of
498            // memory first. It may be worth to return something else in this
499            // case?
500            assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
501                    sending this messages would overflow the state");
502
503            state.num_messages += 1;
504
505            // The channel is closed by all sender handles being dropped.
506            if close {
507                state.is_open = false;
508            }
509
510            let next = encode_state(&state);
511            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
512                Ok(_) => {
513                    // Block if the current number of pending messages has exceeded
514                    // the configured buffer size
515                    let park_self = !close && match self.inner.buffer {
516                        Some(buffer) => state.num_messages > buffer,
517                        None => false,
518                    };
519
520                    return Some(park_self)
521                }
522                Err(actual) => curr = actual,
523            }
524        }
525    }
526
527    fn park(&mut self, cx: Option<&mut task::Context>) {
528        // TODO: clean up internal state if the task::current will fail
529
530        let waker = cx.map(|cx| cx.waker().clone());
531
532        {
533            let mut sender = self.sender_waker.lock().unwrap();
534            sender.waker = waker;
535            sender.is_blocked = true;
536        }
537
538        // Send handle over queue
539        let t = self.sender_waker.clone();
540        self.inner.parked_queue.push(t);
541
542        // Check to make sure we weren't closed after we sent our task on the
543        // queue
544        self.maybe_blocked = !self.is_closed();
545    }
546
547    /// Polls the channel to determine if there is guaranteed capacity to send
548    /// at least one item without waiting.
549    ///
550    /// # Return value
551    ///
552    /// This method returns:
553    ///
554    /// - `Ok(Async::Ready(_))` if there is sufficient capacity;
555    /// - `Ok(Async::Pending)` if the channel may not have
556    /// capacity, in which case the current task is queued to be notified once capacity is available;
557    /// - `Err(SendError)` if the receiver has been dropped.
558    pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError> {
559        let state = decode_state(self.inner.state.load(SeqCst));
560        if !state.is_open {
561            return Err(SendError {
562                kind: SendErrorKind::Disconnected,
563            });
564        }
565
566        Ok(self.poll_unparked(Some(cx)))
567    }
568
569    /// Returns whether this channel is closed without needing a context.
570    pub fn is_closed(&self) -> bool {
571        !decode_state(self.inner.state.load(SeqCst)).is_open
572    }
573
574    /// Closes this channel from the sender side, preventing any new messages.
575    pub fn close_channel(&mut self) {
576        // There's no need to park this sender, its dropping,
577        // and we don't want to check for capacity, so skip
578        // that stuff from `do_send`.
579
580        let _ = self.do_send_nb(None);
581    }
582
583    fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> {
584        // First check the `maybe_blocked` variable. This avoids acquiring the
585        // lock in most cases
586        if self.maybe_blocked {
587            // Get a lock on the task handle
588            let mut sender_waker = self.sender_waker.lock().unwrap();
589
590            if !sender_waker.is_blocked {
591                self.maybe_blocked = false;
592                return Async::Ready(())
593            }
594
595            // At this point, an wake request is pending, so there will be an
596            // wake sometime in the future. We just need to make sure that
597            // the correct task will be notified.
598            //
599            // Update the waker in case the `Sender` has been moved to another
600            // task
601            sender_waker.waker = cx.map(|cx| cx.waker().clone());
602
603            Async::Pending
604        } else {
605            Async::Ready(())
606        }
607    }
608}
609
610impl<T> Clone for Sender<T> {
611    fn clone(&self) -> Sender<T> {
612        // Since this atomic op isn't actually guarding any memory and we don't
613        // care about any orderings besides the ordering on the single atomic
614        // variable, a relaxed ordering is acceptable.
615        let mut curr = self.inner.num_senders.load(SeqCst);
616
617        loop {
618            // If the maximum number of senders has been reached, then fail
619            if curr == self.inner.max_senders() {
620                panic!("cannot clone `Sender` -- too many outstanding senders");
621            }
622
623            debug_assert!(curr < self.inner.max_senders());
624
625            let next = curr + 1;
626            let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
627
628            // The ABA problem doesn't matter here. We only care that the
629            // number of senders never exceeds the maximum.
630            if actual == curr {
631                return Sender {
632                    inner: self.inner.clone(),
633                    sender_waker: Arc::new(Mutex::new(SenderWaker::new())),
634                    maybe_blocked: false,
635                };
636            }
637
638            curr = actual;
639        }
640    }
641}
642
643impl<T> Drop for Sender<T> {
644    fn drop(&mut self) {
645        // Ordering between variables don't matter here
646        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
647
648        if prev == 1 {
649            // There's no need to park this sender, its dropping,
650            // and we don't want to check for capacity, so skip
651            // that stuff from `do_send`.
652            let _ = self.do_send_nb(None);
653        }
654    }
655}
656
657/*
658 *
659 * ===== impl Receiver =====
660 *
661 */
662
663impl<T> Receiver<T> {
664    /// Closes the receiving half of a channel, without dropping it.
665    ///
666    /// This prevents any further messages from being sent on the channel while
667    /// still enabling the receiver to drain messages that are buffered.
668    pub fn close(&mut self) {
669        let mut curr = self.inner.state.load(SeqCst);
670
671        loop {
672            let mut state = decode_state(curr);
673
674            if !state.is_open {
675                break
676            }
677
678            state.is_open = false;
679
680            let next = encode_state(&state);
681            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
682                Ok(_) => break,
683                Err(actual) => curr = actual,
684            }
685        }
686
687        // Wake up any threads waiting as they'll see that we've closed the
688        // channel and will continue on their merry way.
689        loop {
690            match unsafe { self.inner.parked_queue.pop() } {
691                PopResult::Data(task) => {
692                    task.lock().unwrap().wake();
693                }
694                PopResult::Empty => break,
695                PopResult::Inconsistent => thread::yield_now(),
696            }
697        }
698    }
699
700    /// Tries to receive the next message without wakeing a context if empty.
701    ///
702    /// It is not recommended to call this function from inside of a future,
703    /// only when you've otherwise arranged to be notified when the channel is
704    /// no longer empty.
705    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
706        match self.next_message() {
707            Async::Ready(msg) => {
708                Ok(msg)
709            },
710            Async::Pending => Err(TryRecvError { _inner: () }),
711        }
712    }
713
714    fn next_message(&mut self) -> Async<Option<T>> {
715        // Pop off a message
716        loop {
717            match unsafe { self.inner.message_queue.pop() } {
718                PopResult::Data(msg) => {
719                    // If there are any parked task handles in the parked queue, pop
720                    // one and unpark it.
721                    self.wake_one();
722
723                    // Decrement number of messages
724                    self.dec_num_messages();
725
726                    return Async::Ready(msg);
727                }
728                PopResult::Empty => {
729                    // The queue is empty, return Pending
730                    return Async::Pending;
731                }
732                PopResult::Inconsistent => {
733                    // Inconsistent means that there will be a message to pop
734                    // in a short time. This branch can only be reached if
735                    // values are being produced from another thread, so there
736                    // are a few ways that we can deal with this:
737                    //
738                    // 1) Spin
739                    // 2) thread::yield_now()
740                    // 3) task::current().unwrap() & return Pending
741                    //
742                    // For now, thread::yield_now() is used, but it would
743                    // probably be better to spin a few times then yield.
744                    thread::yield_now();
745                }
746            }
747        }
748    }
749
750    // Unpark a single task handle if there is one pending in the parked queue
751    fn wake_one(&mut self) {
752        loop {
753            match unsafe { self.inner.parked_queue.pop() } {
754                PopResult::Data(task) => {
755                    task.lock().unwrap().wake();
756                    return;
757                }
758                PopResult::Empty => {
759                    // Queue empty, no task to wake up.
760                    return;
761                }
762                PopResult::Inconsistent => {
763                    // Same as above
764                    thread::yield_now();
765                }
766            }
767        }
768    }
769
770    fn dec_num_messages(&self) {
771        let mut curr = self.inner.state.load(SeqCst);
772
773        loop {
774            let mut state = decode_state(curr);
775
776            state.num_messages -= 1;
777
778            let next = encode_state(&state);
779            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
780                Ok(_) => break,
781                Err(actual) => curr = actual,
782            }
783        }
784    }
785
786    fn poll_next_no_register(&mut self) -> Async<Option<T>> {
787        // Try to read a message off of the message queue.
788        if let Async::Ready(msg) = self.next_message() {
789            return Async::Ready(msg);
790        }
791
792        // Check if the channel is closed.
793        let state = decode_state(self.inner.state.load(SeqCst));
794        if !state.is_open && state.num_messages == 0 {
795            return Async::Ready(None);
796        }
797
798        Async::Pending
799    }
800}
801
802impl<T> Stream for Receiver<T> {
803    type Item = T;
804    type Error = Never;
805
806    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
807        if let Async::Ready(x) = self.poll_next_no_register() {
808            return Ok(Async::Ready(x));
809        }
810
811        // Register to receive a wakeup when more messages are sent.
812        self.inner.recv_waker.register(cx.waker());
813
814        // Check again for messages just in case one arrived in
815        // between the call to `next_message` and `register` above.
816        Ok(self.poll_next_no_register())
817
818        // The channel is not empty, not closed, and
819        // we're set to receive a wakeup when a message is sent.
820    }
821}
822
823impl<T> Drop for Receiver<T> {
824    fn drop(&mut self) {
825        // Drain the channel of all pending messages
826        self.close();
827        while self.next_message().is_ready() {
828            // ...
829        }
830    }
831}
832
833/*
834 *
835 * ===== impl Inner =====
836 *
837 */
838
839impl<T> Inner<T> {
840    // The return value is such that the total number of messages that can be
841    // enqueued into the channel will never exceed MAX_CAPACITY
842    fn max_senders(&self) -> usize {
843        match self.buffer {
844            Some(buffer) => MAX_CAPACITY - buffer,
845            None => MAX_BUFFER,
846        }
847    }
848}
849
850unsafe impl<T: Send> Send for Inner<T> {}
851unsafe impl<T: Send> Sync for Inner<T> {}
852
853/*
854 *
855 * ===== Helpers =====
856 *
857 */
858
859fn decode_state(num: usize) -> State {
860    State {
861        is_open: num & OPEN_MASK == OPEN_MASK,
862        num_messages: num & MAX_CAPACITY,
863    }
864}
865
866fn encode_state(state: &State) -> usize {
867    let mut num = state.num_messages;
868
869    if state.is_open {
870        num |= OPEN_MASK;
871    }
872
873    num
874}
875
876/*
877 *
878 * ==== Unbounded channels ====
879 *
880 */
881
882/// Creates an unbounded mpsc channel for communicating between asynchronous tasks.
883///
884/// A `send` on this channel will always succeed as long as the receive half has
885/// not been closed. If the receiver falls behind, messages will be arbitrarily
886/// buffered.
887///
888/// **Note** that the amount of available system memory is an implicit bound to
889/// the channel. Using an `unbounded` channel has the ability of causing the
890/// process to run out of memory. In this case, the process will be aborted.
891pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
892    let tx = Arc::new(UnboundedInner {
893        closed: AtomicBool::new(false),
894        message_queue: Queue::new(),
895        recv_waker: AtomicWaker::new(),
896    });
897    let rx = tx.clone();
898    (UnboundedSender(tx), UnboundedReceiver(rx))
899}
900
901/// The transmission end of an unbounded mpsc channel.
902///
903/// This value is created by the [`unbounded`](unbounded) function.
904#[derive(Debug, Clone)]
905pub struct UnboundedSender<T>(Arc<UnboundedInner<T>>);
906
907/// The receiving end of an unbounded mpsc channel.
908///
909/// This value is created by the [`unbounded`](unbounded) function.
910#[derive(Debug)]
911pub struct UnboundedReceiver<T>(Arc<UnboundedInner<T>>);
912
913trait AssertKinds: Send + Sync + Clone {}
914impl AssertKinds for UnboundedSender<u32> {}
915
916#[derive(Debug)]
917struct UnboundedInner<T> {
918    closed: AtomicBool,
919    message_queue: Queue<T>,
920    recv_waker: AtomicWaker,
921}
922
923impl<T> UnboundedSender<T> {
924    /// Check if the channel is ready to receive a message.
925    pub fn poll_ready(&self, _: &mut task::Context) -> Poll<(), SendError> {
926        Ok(Async::Ready(()))
927    }
928
929    /// Returns whether this channel is closed without needing a context.
930    pub fn is_closed(&self) -> bool {
931        self.0.closed.load(SeqCst)
932    }
933
934    /// Closes this channel from the sender side, preventing any new messages.
935    pub fn close_channel(&self) {
936        self.0.closed.store(true, SeqCst);
937        self.0.recv_waker.wake();
938    }
939
940    /// Send a message on the channel.
941    ///
942    /// This method should only be called after `poll_ready` has been used to
943    /// verify that the channel is ready to receive a message.
944    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
945        if self.0.closed.load(SeqCst) {
946            return Err(SendError::disconnected());
947        }
948        self.0.message_queue.push(msg);
949        self.0.recv_waker.wake();
950        Ok(())
951    }
952
953    /// Sends a message along this channel.
954    ///
955    /// This is an unbounded sender, so this function differs from `Sink::send`
956    /// by ensuring the return type reflects that the channel is always ready to
957    /// receive messages.
958    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
959        // TODO there's a race between checking the `closed` atomicbool
960        // and pushing onto the queue.
961        if self.0.closed.load(SeqCst) {
962            return Err(TrySendError {
963                err: SendError::disconnected(),
964                val: msg,
965            });
966        }
967        self.0.message_queue.push(msg);
968        self.0.recv_waker.wake();
969        Ok(())
970    }
971}
972
973impl<T> Drop for UnboundedSender<T> {
974    fn drop(&mut self) {
975        if Arc::strong_count(&self.0) == 2 { 
976            // If it's just us and the reciever, or us and another sender,
977            // the channel should be closed.
978            self.0.closed.store(true, SeqCst);
979            self.0.recv_waker.wake();
980        }
981    }
982}
983
984impl<T> UnboundedReceiver<T> {
985    /// Closes the receiving half of the channel, without dropping it.
986    ///
987    /// This prevents any further messages from being sent on the channel while
988    /// still enabling the receiver to drain messages that are buffered.
989    pub fn close(&mut self) {
990        self.0.closed.store(true, SeqCst);
991    }
992
993    /// Tries to receive the next message without notifying a context if empty.
994    ///
995    /// It is not recommended to call this function from inside of a future,
996    /// only when you've otherwise arranged to be notified when the channel is
997    /// no longer empty.
998    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
999        loop {
1000            // Safe because this is the only place the message queue is popped,
1001            // and it takes `&mut self` to ensure that only the unique reciever
1002            // can pop off of the message queue.
1003            match unsafe { self.0.message_queue.pop() } {
1004                PopResult::Data(msg) => return Ok(Some(msg)),
1005                PopResult::Empty => {
1006                    if self.0.closed.load(SeqCst) {
1007                        // Ensure that the `closed` state wasn't written after
1008                        // a final message was sent.
1009                        match unsafe { self.0.message_queue.pop() } {
1010                            PopResult::Data(msg) => return Ok(Some(msg)),
1011                            PopResult::Empty => return Ok(None),
1012                            PopResult::Inconsistent => {
1013                                thread::yield_now();
1014                                continue;
1015                            }
1016                        }
1017                    }
1018                    return Err(TryRecvError { _inner: () });
1019                }
1020                PopResult::Inconsistent => {
1021                    // Inconsistent means that there will be a message to pop
1022                    // in a short time. This branch can only be reached if
1023                    // values are being produced from another thread, so there
1024                    // are a few ways that we can deal with this:
1025                    //
1026                    // 1) Spin
1027                    // 2) thread::yield_now()
1028                    // 3) task::current().unwrap() & return Pending
1029                    //
1030                    // For now, thread::yield_now() is used, but it would
1031                    // probably be better to spin a few times then yield.
1032                    thread::yield_now();
1033                }
1034            }
1035        }
1036    }
1037}
1038
1039impl<T> Stream for UnboundedReceiver<T> {
1040    type Item = T;
1041    type Error = Never;
1042
1043    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
1044        if let Ok(msg) = self.try_next() {
1045            return Ok(Async::Ready(msg));
1046        }
1047        self.0.recv_waker.register(cx.waker());
1048        if let Ok(msg) = self.try_next() {
1049            return Ok(Async::Ready(msg));
1050        }
1051        Ok(Async::Pending)
1052    }
1053}
1054
1055impl<T> Drop for UnboundedReceiver<T> {
1056    fn drop(&mut self) {
1057        self.0.closed.store(true, SeqCst);
1058    }
1059}
1060