tokio_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
2//!
3//! A channel can be used as a communication primitive between tasks running on
4//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
5//! handles. `Receiver` implements `Stream` and allows a task to read values
6//! out of the channel. If there is no message to read from the channel, the
7//! current task will be notified when a new value is sent. `Sender` implements
8//! the `Sink` trait and allows a task to send messages into the channel. If
9//! the channel is at capacity, then send will be rejected and the task will be
10//! notified when additional capacity is available.
11//!
12//! # Disconnection
13//!
14//! When all `Sender` handles have been dropped, it is no longer possible to
15//! send values into the channel. This is considered the termination event of
16//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
17//!
18//! If the receiver handle is dropped, then messages can no longer be read out
19//! of the channel. In this case, a `send` will result in an error.
20//!
21//! # Clean Shutdown
22//!
23//! If the `Receiver` is simply dropped, then it is possible for there to be
24//! messages still in the channel that will not be processed. As such, it is
25//! usually desirable to perform a "clean" shutdown. To do this, the receiver
26//! will first call `close`, which will prevent any further messages to be sent
27//! into the channel. Then, the receiver consumes the channel to completion, at
28//! which point the receiver can be dropped.
29
30// At the core, the channel uses an atomic FIFO queue for message passing. This
31// queue is used as the primary coordination primitive. In order to enforce
32// capacity limits and handle back pressure, a secondary FIFO queue is used to
33// send parked task handles.
34//
35// The general idea is that the channel is created with a `buffer` size of `n`.
36// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
37// slot to hold a message. This allows `Sender` to know for a fact that a send
38// will succeed *before* starting to do the actual work of sending the value.
39// Since most of this work is lock-free, once the work starts, it is impossible
40// to safely revert.
41//
42// If the sender is unable to process a send operation, then the current
43// task is parked and the handle is sent on the parked task queue.
44//
45// Note that the implementation guarantees that the channel capacity will never
46// exceed the configured limit, however there is no *strict* guarantee that the
47// receiver will wake up a parked task *immediately* when a slot becomes
48// available. However, it will almost always unpark a task when a slot becomes
49// available and it is *guaranteed* that a sender will be unparked when the
50// message that caused the sender to become parked is read out of the channel.
51//
52// The steps for sending a message are roughly:
53//
54// 1) Increment the channel message count
55// 2) If the channel is at capacity, push the task handle onto the wait queue
56// 3) Push the message onto the message queue.
57//
58// The steps for receiving a message are roughly:
59//
60// 1) Pop a message from the message queue
61// 2) Pop a task handle from the wait queue
62// 3) Decrement the channel message count.
63//
64// It's important for the order of operations on lock-free structures to happen
65// in reverse order between the sender and receiver. This makes the message
66// queue the primary coordination structure and establishes the necessary
67// happens-before semantics required for the acquire / release semantics used
68// by the queue structure.
69
70
71
72use mpsc::queue::{Queue, PopResult};
73
74use futures::task::{self, Task};
75use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream};
76
77use std::fmt;
78use std::error::Error;
79use std::any::Any;
80use std::sync::atomic::AtomicUsize;
81use std::sync::atomic::Ordering::SeqCst;
82use std::sync::{Arc, Mutex};
83use std::thread;
84use std::usize;
85
86mod queue;
87
88/// The transmission end of a channel which is used to send values.
89///
90/// This is created by the `channel` method.
91#[derive(Debug)]
92pub struct Sender<T> {
93    // Channel state shared between the sender and receiver.
94    inner: Arc<Inner<T>>,
95
96    // Handle to the task that is blocked on this sender. This handle is sent
97    // to the receiver half in order to be notified when the sender becomes
98    // unblocked.
99    sender_task: Arc<Mutex<SenderTask>>,
100
101    // True if the sender might be blocked. This is an optimization to avoid
102    // having to lock the mutex most of the time.
103    maybe_parked: bool,
104}
105
106/// The transmission end of a channel which is used to send values.
107///
108/// This is created by the `unbounded` method.
109#[derive(Debug)]
110pub struct UnboundedSender<T>(Sender<T>);
111
112trait AssertKinds: Send + Sync + Clone {}
113impl AssertKinds for UnboundedSender<u32> {}
114
115
116/// The receiving end of a channel which implements the `Stream` trait.
117///
118/// This is a concrete implementation of a stream which can be used to represent
119/// a stream of values being computed elsewhere. This is created by the
120/// `channel` method.
121#[derive(Debug)]
122pub struct Receiver<T> {
123    inner: Arc<Inner<T>>,
124}
125
126/// Error type for sending, used when the receiving end of a channel is
127/// dropped
128#[derive(Clone, PartialEq, Eq)]
129pub struct SendError<T>(T);
130
131/// Error type returned from `try_send`
132#[derive(Clone, PartialEq, Eq)]
133pub struct TrySendError<T> {
134    kind: TrySendErrorKind<T>,
135}
136
137#[derive(Clone, PartialEq, Eq)]
138enum TrySendErrorKind<T> {
139    Full(T),
140    Disconnected(T),
141}
142
143impl<T> fmt::Debug for SendError<T> {
144    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
145        fmt.debug_tuple("SendError")
146            .field(&"...")
147            .finish()
148    }
149}
150
151impl<T> fmt::Display for SendError<T> {
152    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
153        write!(fmt, "send failed because receiver is gone")
154    }
155}
156
157impl<T: Any> Error for SendError<T>
158{
159    fn description(&self) -> &str {
160        "send failed because receiver is gone"
161    }
162}
163
164impl<T> SendError<T> {
165    /// Returns the message that was attempted to be sent but failed.
166    pub fn into_inner(self) -> T {
167        self.0
168    }
169}
170
171impl<T> fmt::Debug for TrySendError<T> {
172    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
173        fmt.debug_tuple("TrySendError")
174            .field(&"...")
175            .finish()
176    }
177}
178
179impl<T> fmt::Display for TrySendError<T> {
180    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
181        if self.is_full() {
182            write!(fmt, "send failed because channel is full")
183        } else {
184            write!(fmt, "send failed because receiver is gone")
185        }
186    }
187}
188
189impl<T: Any> Error for TrySendError<T> {
190    fn description(&self) -> &str {
191        if self.is_full() {
192            "send failed because channel is full"
193        } else {
194            "send failed because receiver is gone"
195        }
196    }
197}
198
199impl<T> TrySendError<T> {
200    /// Returns true if this error is a result of the channel being full
201    pub fn is_full(&self) -> bool {
202        use self::TrySendErrorKind::*;
203
204        match self.kind {
205            Full(_) => true,
206            _ => false,
207        }
208    }
209
210    /// Returns true if this error is a result of the receiver being dropped
211    pub fn is_disconnected(&self) -> bool {
212        use self::TrySendErrorKind::*;
213
214        match self.kind {
215            Disconnected(_) => true,
216            _ => false,
217        }
218    }
219
220    /// Returns the message that was attempted to be sent but failed.
221    pub fn into_inner(self) -> T {
222        use self::TrySendErrorKind::*;
223
224        match self.kind {
225            Full(v) | Disconnected(v) => v,
226        }
227    }
228}
229
230#[derive(Debug)]
231struct Inner<T> {
232    // Max buffer size of the channel. If `None` then the channel is unbounded.
233    buffer: Option<usize>,
234
235    // Internal channel state. Consists of the number of messages stored in the
236    // channel as well as a flag signalling that the channel is closed.
237    state: AtomicUsize,
238
239    // Atomic, FIFO queue used to send messages to the receiver
240    message_queue: Queue<Option<T>>,
241
242    // Atomic, FIFO queue used to send parked task handles to the receiver.
243    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
244
245    // Number of senders in existence
246    num_senders: AtomicUsize,
247
248    // Handle to the receiver's task.
249    recv_task: Mutex<ReceiverTask>,
250}
251
252// Struct representation of `Inner::state`.
253#[derive(Debug, Clone, Copy)]
254struct State {
255    // `true` when the channel is open
256    is_open: bool,
257
258    // Number of messages in the channel
259    num_messages: usize,
260}
261
262#[derive(Debug)]
263struct ReceiverTask {
264    unparked: bool,
265    task: Option<Task>,
266}
267
268// Returned from Receiver::try_park()
269enum TryPark {
270    Parked,
271    Closed,
272    NotEmpty,
273}
274
275// The `is_open` flag is stored in the left-most bit of `Inner::state`
276const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
277
278// When a new channel is created, it is created in the open state with no
279// pending messages.
280const INIT_STATE: usize = OPEN_MASK;
281
282// The maximum number of messages that a channel can track is `usize::MAX >> 1`
283const MAX_CAPACITY: usize = !(OPEN_MASK);
284
285// The maximum requested buffer size must be less than the maximum capacity of
286// a channel. This is because each sender gets a guaranteed slot.
287const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
288
289// Sent to the consumer to wake up blocked producers
290#[derive(Debug)]
291struct SenderTask {
292    task: Option<Task>,
293    is_parked: bool,
294}
295
296impl SenderTask {
297    fn new() -> Self {
298        SenderTask {
299            task: None,
300            is_parked: false,
301        }
302    }
303
304    fn notify(&mut self) {
305        self.is_parked = false;
306
307        if let Some(task) = self.task.take() {
308            task.notify();
309        }
310    }
311}
312
313/// Creates an in-memory channel implementation of the `Stream` trait with
314/// bounded capacity.
315///
316/// This method creates a concrete implementation of the `Stream` trait which
317/// can be used to send values across threads in a streaming fashion. This
318/// channel is unique in that it implements back pressure to ensure that the
319/// sender never outpaces the receiver. The channel capacity is equal to
320/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
321/// in the channel capacity, and on top of that there are `buffer` "first come,
322/// first serve" slots available to all senders.
323///
324/// The `Receiver` returned implements the `Stream` trait and has access to any
325/// number of the associated combinators for transforming the result.
326pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
327    // Check that the requested buffer size does not exceed the maximum buffer
328    // size permitted by the system.
329    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
330    channel2(Some(buffer))
331}
332
333/// Creates an in-memory channel implementation of the `Stream` trait with
334/// unbounded capacity.
335///
336/// This method creates a concrete implementation of the `Stream` trait which
337/// can be used to send values across threads in a streaming fashion. A `send`
338/// on this channel will always succeed as long as the receive half has not
339/// been closed. If the receiver falls behind, messages will be buffered
340/// internally.
341///
342/// **Note** that the amount of available system memory is an implicit bound to
343/// the channel. Using an `unbounded` channel has the ability of causing the
344/// process to run out of memory. In this case, the process will be aborted.
345pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
346    let (tx, rx) = channel2(None);
347    (UnboundedSender(tx), rx)
348}
349
350fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
351    let inner = Arc::new(Inner {
352        buffer: buffer,
353        state: AtomicUsize::new(INIT_STATE),
354        message_queue: Queue::new(),
355        parked_queue: Queue::new(),
356        num_senders: AtomicUsize::new(1),
357        recv_task: Mutex::new(ReceiverTask {
358            unparked: false,
359            task: None,
360        }),
361    });
362
363    let tx = Sender {
364        inner: inner.clone(),
365        sender_task: Arc::new(Mutex::new(SenderTask::new())),
366        maybe_parked: false,
367    };
368
369    let rx = Receiver {
370        inner: inner,
371    };
372
373    (tx, rx)
374}
375
376/*
377 *
378 * ===== impl Sender =====
379 *
380 */
381
382impl<T> Sender<T> {
383    /// Attempts to send a message on this `Sender<T>` without blocking.
384    ///
385    /// This function, unlike `start_send`, is safe to call whether it's being
386    /// called on a task or not. Note that this function, however, will *not*
387    /// attempt to block the current task if the message cannot be sent.
388    ///
389    /// It is not recommended to call this function from inside of a future,
390    /// only from an external thread where you've otherwise arranged to be
391    /// notified when the channel is no longer full.
392    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
393        // If the sender is currently blocked, reject the message
394        if !self.poll_unparked(false).is_ready() {
395            return Err(TrySendError {
396                kind: TrySendErrorKind::Full(msg),
397            });
398        }
399
400        // The channel has capacity to accept the message, so send it
401        self.do_send(Some(msg), false)
402            .map_err(|SendError(v)| {
403                TrySendError {
404                    kind: TrySendErrorKind::Disconnected(v),
405                }
406            })
407    }
408
409    // Do the send without failing
410    // None means close
411    fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
412        // First, increment the number of messages contained by the channel.
413        // This operation will also atomically determine if the sender task
414        // should be parked.
415        //
416        // None is returned in the case that the channel has been closed by the
417        // receiver. This happens when `Receiver::close` is called or the
418        // receiver is dropped.
419        let park_self = match self.inc_num_messages(msg.is_none()) {
420            Some(park_self) => park_self,
421            None => {
422                // The receiver has closed the channel. Only abort if actually
423                // sending a message. It is important that the stream
424                // termination (None) is always sent. This technically means
425                // that it is possible for the queue to contain the following
426                // number of messages:
427                //
428                //     num-senders + buffer + 1
429                //
430                if let Some(msg) = msg {
431                    return Err(SendError(msg));
432                } else {
433                    return Ok(());
434                }
435            }
436        };
437
438        // If the channel has reached capacity, then the sender task needs to
439        // be parked. This will send the task handle on the parked task queue.
440        //
441        // However, when `do_send` is called while dropping the `Sender`,
442        // `task::current()` can't be called safely. In this case, in order to
443        // maintain internal consistency, a blank message is pushed onto the
444        // parked task queue.
445        if park_self {
446            self.park(do_park);
447        }
448
449        self.queue_push_and_signal(msg);
450
451        Ok(())
452    }
453
454    // Do the send without parking current task.
455    //
456    // To be called from unbounded sender.
457    fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
458        match self.inc_num_messages(false) {
459            Some(park_self) => assert!(!park_self),
460            None => return Err(SendError(msg)),
461        };
462
463        self.queue_push_and_signal(Some(msg));
464
465        Ok(())
466    }
467
468    // Push message to the queue and signal to the receiver
469    fn queue_push_and_signal(&self, msg: Option<T>) {
470        // Push the message onto the message queue
471        self.inner.message_queue.push(msg);
472
473        // Signal to the receiver that a message has been enqueued. If the
474        // receiver is parked, this will unpark the task.
475        self.signal();
476    }
477
478    // Increment the number of queued messages. Returns if the sender should
479    // block.
480    fn inc_num_messages(&self, close: bool) -> Option<bool> {
481        let mut curr = self.inner.state.load(SeqCst);
482
483        loop {
484            let mut state = decode_state(curr);
485
486            // The receiver end closed the channel.
487            if !state.is_open {
488                return None;
489            }
490
491            // This probably is never hit? Odds are the process will run out of
492            // memory first. It may be worth to return something else in this
493            // case?
494            assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
495                    sending this messages would overflow the state");
496
497            state.num_messages += 1;
498
499            // The channel is closed by all sender handles being dropped.
500            if close {
501                state.is_open = false;
502            }
503
504            let next = encode_state(&state);
505            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
506                Ok(_) => {
507                    // Block if the current number of pending messages has exceeded
508                    // the configured buffer size
509                    let park_self = match self.inner.buffer {
510                        Some(buffer) => state.num_messages > buffer,
511                        None => false,
512                    };
513
514                    return Some(park_self)
515                }
516                Err(actual) => curr = actual,
517            }
518        }
519    }
520
521    // Signal to the receiver task that a message has been enqueued
522    fn signal(&self) {
523        // TODO
524        // This logic can probably be improved by guarding the lock with an
525        // atomic.
526        //
527        // Do this step first so that the lock is dropped when
528        // `unpark` is called
529        let task = {
530            let mut recv_task = self.inner.recv_task.lock().unwrap();
531
532            // If the receiver has already been unparked, then there is nothing
533            // more to do
534            if recv_task.unparked {
535                return;
536            }
537
538            // Setting this flag enables the receiving end to detect that
539            // an unpark event happened in order to avoid unnecessarily
540            // parking.
541            recv_task.unparked = true;
542            recv_task.task.take()
543        };
544
545        if let Some(task) = task {
546            task.notify();
547        }
548    }
549
550    fn park(&mut self, can_park: bool) {
551        // TODO: clean up internal state if the task::current will fail
552
553        let task = if can_park {
554            Some(task::current())
555        } else {
556            None
557        };
558
559        {
560            let mut sender = self.sender_task.lock().unwrap();
561            sender.task = task;
562            sender.is_parked = true;
563        }
564
565        // Send handle over queue
566        let t = self.sender_task.clone();
567        self.inner.parked_queue.push(t);
568
569        // Check to make sure we weren't closed after we sent our task on the
570        // queue
571        let state = decode_state(self.inner.state.load(SeqCst));
572        self.maybe_parked = state.is_open;
573    }
574
575    /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
576    /// item without waiting.
577    ///
578    /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
579    /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
580    /// `Err(SendError(_))` if the receiver has been dropped.
581    ///
582    /// # Panics
583    ///
584    /// This method will panic if called from outside the context of a task or future.
585    pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
586        let state = decode_state(self.inner.state.load(SeqCst));
587        if !state.is_open {
588            return Err(SendError(()));
589        }
590
591        Ok(self.poll_unparked(true))
592    }
593
594    fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
595        // First check the `maybe_parked` variable. This avoids acquiring the
596        // lock in most cases
597        if self.maybe_parked {
598            // Get a lock on the task handle
599            let mut task = self.sender_task.lock().unwrap();
600
601            if !task.is_parked {
602                self.maybe_parked = false;
603                return Async::Ready(())
604            }
605
606            // At this point, an unpark request is pending, so there will be an
607            // unpark sometime in the future. We just need to make sure that
608            // the correct task will be notified.
609            //
610            // Update the task in case the `Sender` has been moved to another
611            // task
612            task.task = if do_park {
613                Some(task::current())
614            } else {
615                None
616            };
617
618            Async::NotReady
619        } else {
620            Async::Ready(())
621        }
622    }
623}
624
625impl<T> Sink for Sender<T> {
626    type SinkItem = T;
627    type SinkError = SendError<T>;
628
629    fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
630        // If the sender is currently blocked, reject the message before doing
631        // any work.
632        if !self.poll_unparked(true).is_ready() {
633            return Ok(AsyncSink::NotReady(msg));
634        }
635
636        // The channel has capacity to accept the message, so send it.
637        self.do_send(Some(msg), true)?;
638
639        Ok(AsyncSink::Ready)
640    }
641
642    fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
643        Ok(Async::Ready(()))
644    }
645
646    fn close(&mut self) -> Poll<(), SendError<T>> {
647        Ok(Async::Ready(()))
648    }
649}
650
651impl<T> UnboundedSender<T> {
652    /// Sends the provided message along this channel.
653    ///
654    /// This is an unbounded sender, so this function differs from `Sink::send`
655    /// by ensuring the return type reflects that the channel is always ready to
656    /// receive messages.
657    #[deprecated(note = "renamed to `unbounded_send`")]
658    #[doc(hidden)]
659    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
660        self.unbounded_send(msg)
661    }
662
663    /// Sends the provided message along this channel.
664    ///
665    /// This is an unbounded sender, so this function differs from `Sink::send`
666    /// by ensuring the return type reflects that the channel is always ready to
667    /// receive messages.
668    pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
669        self.0.do_send_nb(msg)
670    }
671}
672
673impl<T> Sink for UnboundedSender<T> {
674    type SinkItem = T;
675    type SinkError = SendError<T>;
676
677    fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
678        self.0.start_send(msg)
679    }
680
681    fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
682        self.0.poll_complete()
683    }
684
685    fn close(&mut self) -> Poll<(), SendError<T>> {
686        Ok(Async::Ready(()))
687    }
688}
689
690impl<'a, T> Sink for &'a UnboundedSender<T> {
691    type SinkItem = T;
692    type SinkError = SendError<T>;
693
694    fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
695        self.0.do_send_nb(msg)?;
696        Ok(AsyncSink::Ready)
697    }
698
699    fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
700        Ok(Async::Ready(()))
701    }
702
703    fn close(&mut self) -> Poll<(), SendError<T>> {
704        Ok(Async::Ready(()))
705    }
706}
707
708impl<T> Clone for UnboundedSender<T> {
709    fn clone(&self) -> UnboundedSender<T> {
710        UnboundedSender(self.0.clone())
711    }
712}
713
714
715impl<T> Clone for Sender<T> {
716    fn clone(&self) -> Sender<T> {
717        // Since this atomic op isn't actually guarding any memory and we don't
718        // care about any orderings besides the ordering on the single atomic
719        // variable, a relaxed ordering is acceptable.
720        let mut curr = self.inner.num_senders.load(SeqCst);
721
722        loop {
723            // If the maximum number of senders has been reached, then fail
724            if curr == self.inner.max_senders() {
725                panic!("cannot clone `Sender` -- too many outstanding senders");
726            }
727
728            debug_assert!(curr < self.inner.max_senders());
729
730            let next = curr + 1;
731            let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
732
733            // The ABA problem doesn't matter here. We only care that the
734            // number of senders never exceeds the maximum.
735            if actual == curr {
736                return Sender {
737                    inner: self.inner.clone(),
738                    sender_task: Arc::new(Mutex::new(SenderTask::new())),
739                    maybe_parked: false,
740                };
741            }
742
743            curr = actual;
744        }
745    }
746}
747
748impl<T> Drop for Sender<T> {
749    fn drop(&mut self) {
750        // Ordering between variables don't matter here
751        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
752
753        if prev == 1 {
754            let _ = self.do_send(None, false);
755        }
756    }
757}
758
759/*
760 *
761 * ===== impl Receiver =====
762 *
763 */
764
765impl<T> Receiver<T> {
766    /// Closes the receiving half
767    ///
768    /// This prevents any further messages from being sent on the channel while
769    /// still enabling the receiver to drain messages that are buffered.
770    pub fn close(&mut self) {
771        let mut curr = self.inner.state.load(SeqCst);
772
773        loop {
774            let mut state = decode_state(curr);
775
776            if !state.is_open {
777                break
778            }
779
780            state.is_open = false;
781
782            let next = encode_state(&state);
783            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
784                Ok(_) => break,
785                Err(actual) => curr = actual,
786            }
787        }
788
789        // Wake up any threads waiting as they'll see that we've closed the
790        // channel and will continue on their merry way.
791        loop {
792            match unsafe { self.inner.parked_queue.pop() } {
793                PopResult::Data(task) => {
794                    task.lock().unwrap().notify();
795                }
796                PopResult::Empty => break,
797                PopResult::Inconsistent => thread::yield_now(),
798            }
799        }
800    }
801
802    fn next_message(&mut self) -> Async<Option<T>> {
803        // Pop off a message
804        loop {
805            match unsafe { self.inner.message_queue.pop() } {
806                PopResult::Data(msg) => {
807                    return Async::Ready(msg);
808                }
809                PopResult::Empty => {
810                    // The queue is empty, return NotReady
811                    return Async::NotReady;
812                }
813                PopResult::Inconsistent => {
814                    // Inconsistent means that there will be a message to pop
815                    // in a short time. This branch can only be reached if
816                    // values are being produced from another thread, so there
817                    // are a few ways that we can deal with this:
818                    //
819                    // 1) Spin
820                    // 2) thread::yield_now()
821                    // 3) task::current().unwrap() & return NotReady
822                    //
823                    // For now, thread::yield_now() is used, but it would
824                    // probably be better to spin a few times then yield.
825                    thread::yield_now();
826                }
827            }
828        }
829    }
830
831    // Unpark a single task handle if there is one pending in the parked queue
832    fn unpark_one(&mut self) {
833        loop {
834            match unsafe { self.inner.parked_queue.pop() } {
835                PopResult::Data(task) => {
836                    task.lock().unwrap().notify();
837                    return;
838                }
839                PopResult::Empty => {
840                    // Queue empty, no task to wake up.
841                    return;
842                }
843                PopResult::Inconsistent => {
844                    // Same as above
845                    thread::yield_now();
846                }
847            }
848        }
849    }
850
851    // Try to park the receiver task
852    fn try_park(&self) -> TryPark {
853        let curr = self.inner.state.load(SeqCst);
854        let state = decode_state(curr);
855
856        // If the channel is closed, then there is no need to park.
857        if !state.is_open && state.num_messages == 0 {
858            return TryPark::Closed;
859        }
860
861        // First, track the task in the `recv_task` slot
862        let mut recv_task = self.inner.recv_task.lock().unwrap();
863
864        if recv_task.unparked {
865            // Consume the `unpark` signal without actually parking
866            recv_task.unparked = false;
867            return TryPark::NotEmpty;
868        }
869
870        recv_task.task = Some(task::current());
871        TryPark::Parked
872    }
873
874    fn dec_num_messages(&self) {
875        let mut curr = self.inner.state.load(SeqCst);
876
877        loop {
878            let mut state = decode_state(curr);
879
880            state.num_messages -= 1;
881
882            let next = encode_state(&state);
883            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
884                Ok(_) => break,
885                Err(actual) => curr = actual,
886            }
887        }
888    }
889}
890
891impl<T> Stream for Receiver<T> {
892    type Item = T;
893    type Error = ();
894
895    fn poll(&mut self) -> Poll<Option<T>, ()> {
896        loop {
897            // Try to read a message off of the message queue.
898            let msg = match self.next_message() {
899                Async::Ready(msg) => msg,
900                Async::NotReady => {
901                    // There are no messages to read, in this case, attempt to
902                    // park. The act of parking will verify that the channel is
903                    // still empty after the park operation has completed.
904                    match self.try_park() {
905                        TryPark::Parked => {
906                            // The task was parked, and the channel is still
907                            // empty, return NotReady.
908                            return Ok(Async::NotReady);
909                        }
910                        TryPark::Closed => {
911                            // The channel is closed, there will be no further
912                            // messages.
913                            return Ok(Async::Ready(None));
914                        }
915                        TryPark::NotEmpty => {
916                            // A message has been sent while attempting to
917                            // park. Loop again, the next iteration is
918                            // guaranteed to get the message.
919                            continue;
920                        }
921                    }
922                }
923            };
924
925            // If there are any parked task handles in the parked queue, pop
926            // one and unpark it.
927            self.unpark_one();
928
929            // Decrement number of messages
930            self.dec_num_messages();
931
932            // Return the message
933            return Ok(Async::Ready(msg));
934        }
935    }
936}
937
938impl<T> Drop for Receiver<T> {
939    fn drop(&mut self) {
940        // Drain the channel of all pending messages
941        self.close();
942        while self.next_message().is_ready() {
943            // ...
944        }
945    }
946}
947
948/*
949 *
950 * ===== impl Inner =====
951 *
952 */
953
954impl<T> Inner<T> {
955    // The return value is such that the total number of messages that can be
956    // enqueued into the channel will never exceed MAX_CAPACITY
957    fn max_senders(&self) -> usize {
958        match self.buffer {
959            Some(buffer) => MAX_CAPACITY - buffer,
960            None => MAX_BUFFER,
961        }
962    }
963}
964
965unsafe impl<T: Send> Send for Inner<T> {}
966unsafe impl<T: Send> Sync for Inner<T> {}
967
968/*
969 *
970 * ===== Helpers =====
971 *
972 */
973
974fn decode_state(num: usize) -> State {
975    State {
976        is_open: num & OPEN_MASK == OPEN_MASK,
977        num_messages: num & MAX_CAPACITY,
978    }
979}
980
981fn encode_state(state: &State) -> usize {
982    let mut num = state.num_messages;
983
984    if state.is_open {
985        num |= OPEN_MASK;
986    }
987
988    num
989}