tokio_channel/mpsc/mod.rs
1//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure.
2//!
3//! A channel can be used as a communication primitive between tasks running on
4//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender`
5//! handles. `Receiver` implements `Stream` and allows a task to read values
6//! out of the channel. If there is no message to read from the channel, the
7//! current task will be notified when a new value is sent. `Sender` implements
8//! the `Sink` trait and allows a task to send messages into the channel. If
9//! the channel is at capacity, then send will be rejected and the task will be
10//! notified when additional capacity is available.
11//!
12//! # Disconnection
13//!
14//! When all `Sender` handles have been dropped, it is no longer possible to
15//! send values into the channel. This is considered the termination event of
16//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`.
17//!
18//! If the receiver handle is dropped, then messages can no longer be read out
19//! of the channel. In this case, a `send` will result in an error.
20//!
21//! # Clean Shutdown
22//!
23//! If the `Receiver` is simply dropped, then it is possible for there to be
24//! messages still in the channel that will not be processed. As such, it is
25//! usually desirable to perform a "clean" shutdown. To do this, the receiver
26//! will first call `close`, which will prevent any further messages to be sent
27//! into the channel. Then, the receiver consumes the channel to completion, at
28//! which point the receiver can be dropped.
29
30// At the core, the channel uses an atomic FIFO queue for message passing. This
31// queue is used as the primary coordination primitive. In order to enforce
32// capacity limits and handle back pressure, a secondary FIFO queue is used to
33// send parked task handles.
34//
35// The general idea is that the channel is created with a `buffer` size of `n`.
36// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
37// slot to hold a message. This allows `Sender` to know for a fact that a send
38// will succeed *before* starting to do the actual work of sending the value.
39// Since most of this work is lock-free, once the work starts, it is impossible
40// to safely revert.
41//
42// If the sender is unable to process a send operation, then the current
43// task is parked and the handle is sent on the parked task queue.
44//
45// Note that the implementation guarantees that the channel capacity will never
46// exceed the configured limit, however there is no *strict* guarantee that the
47// receiver will wake up a parked task *immediately* when a slot becomes
48// available. However, it will almost always unpark a task when a slot becomes
49// available and it is *guaranteed* that a sender will be unparked when the
50// message that caused the sender to become parked is read out of the channel.
51//
52// The steps for sending a message are roughly:
53//
54// 1) Increment the channel message count
55// 2) If the channel is at capacity, push the task handle onto the wait queue
56// 3) Push the message onto the message queue.
57//
58// The steps for receiving a message are roughly:
59//
60// 1) Pop a message from the message queue
61// 2) Pop a task handle from the wait queue
62// 3) Decrement the channel message count.
63//
64// It's important for the order of operations on lock-free structures to happen
65// in reverse order between the sender and receiver. This makes the message
66// queue the primary coordination structure and establishes the necessary
67// happens-before semantics required for the acquire / release semantics used
68// by the queue structure.
69
70
71
72use mpsc::queue::{Queue, PopResult};
73
74use futures::task::{self, Task};
75use futures::{Async, AsyncSink, Poll, StartSend, Sink, Stream};
76
77use std::fmt;
78use std::error::Error;
79use std::any::Any;
80use std::sync::atomic::AtomicUsize;
81use std::sync::atomic::Ordering::SeqCst;
82use std::sync::{Arc, Mutex};
83use std::thread;
84use std::usize;
85
86mod queue;
87
88/// The transmission end of a channel which is used to send values.
89///
90/// This is created by the `channel` method.
91#[derive(Debug)]
92pub struct Sender<T> {
93 // Channel state shared between the sender and receiver.
94 inner: Arc<Inner<T>>,
95
96 // Handle to the task that is blocked on this sender. This handle is sent
97 // to the receiver half in order to be notified when the sender becomes
98 // unblocked.
99 sender_task: Arc<Mutex<SenderTask>>,
100
101 // True if the sender might be blocked. This is an optimization to avoid
102 // having to lock the mutex most of the time.
103 maybe_parked: bool,
104}
105
106/// The transmission end of a channel which is used to send values.
107///
108/// This is created by the `unbounded` method.
109#[derive(Debug)]
110pub struct UnboundedSender<T>(Sender<T>);
111
112trait AssertKinds: Send + Sync + Clone {}
113impl AssertKinds for UnboundedSender<u32> {}
114
115
116/// The receiving end of a channel which implements the `Stream` trait.
117///
118/// This is a concrete implementation of a stream which can be used to represent
119/// a stream of values being computed elsewhere. This is created by the
120/// `channel` method.
121#[derive(Debug)]
122pub struct Receiver<T> {
123 inner: Arc<Inner<T>>,
124}
125
126/// Error type for sending, used when the receiving end of a channel is
127/// dropped
128#[derive(Clone, PartialEq, Eq)]
129pub struct SendError<T>(T);
130
131/// Error type returned from `try_send`
132#[derive(Clone, PartialEq, Eq)]
133pub struct TrySendError<T> {
134 kind: TrySendErrorKind<T>,
135}
136
137#[derive(Clone, PartialEq, Eq)]
138enum TrySendErrorKind<T> {
139 Full(T),
140 Disconnected(T),
141}
142
143impl<T> fmt::Debug for SendError<T> {
144 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
145 fmt.debug_tuple("SendError")
146 .field(&"...")
147 .finish()
148 }
149}
150
151impl<T> fmt::Display for SendError<T> {
152 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
153 write!(fmt, "send failed because receiver is gone")
154 }
155}
156
157impl<T: Any> Error for SendError<T>
158{
159 fn description(&self) -> &str {
160 "send failed because receiver is gone"
161 }
162}
163
164impl<T> SendError<T> {
165 /// Returns the message that was attempted to be sent but failed.
166 pub fn into_inner(self) -> T {
167 self.0
168 }
169}
170
171impl<T> fmt::Debug for TrySendError<T> {
172 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
173 fmt.debug_tuple("TrySendError")
174 .field(&"...")
175 .finish()
176 }
177}
178
179impl<T> fmt::Display for TrySendError<T> {
180 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
181 if self.is_full() {
182 write!(fmt, "send failed because channel is full")
183 } else {
184 write!(fmt, "send failed because receiver is gone")
185 }
186 }
187}
188
189impl<T: Any> Error for TrySendError<T> {
190 fn description(&self) -> &str {
191 if self.is_full() {
192 "send failed because channel is full"
193 } else {
194 "send failed because receiver is gone"
195 }
196 }
197}
198
199impl<T> TrySendError<T> {
200 /// Returns true if this error is a result of the channel being full
201 pub fn is_full(&self) -> bool {
202 use self::TrySendErrorKind::*;
203
204 match self.kind {
205 Full(_) => true,
206 _ => false,
207 }
208 }
209
210 /// Returns true if this error is a result of the receiver being dropped
211 pub fn is_disconnected(&self) -> bool {
212 use self::TrySendErrorKind::*;
213
214 match self.kind {
215 Disconnected(_) => true,
216 _ => false,
217 }
218 }
219
220 /// Returns the message that was attempted to be sent but failed.
221 pub fn into_inner(self) -> T {
222 use self::TrySendErrorKind::*;
223
224 match self.kind {
225 Full(v) | Disconnected(v) => v,
226 }
227 }
228}
229
230#[derive(Debug)]
231struct Inner<T> {
232 // Max buffer size of the channel. If `None` then the channel is unbounded.
233 buffer: Option<usize>,
234
235 // Internal channel state. Consists of the number of messages stored in the
236 // channel as well as a flag signalling that the channel is closed.
237 state: AtomicUsize,
238
239 // Atomic, FIFO queue used to send messages to the receiver
240 message_queue: Queue<Option<T>>,
241
242 // Atomic, FIFO queue used to send parked task handles to the receiver.
243 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
244
245 // Number of senders in existence
246 num_senders: AtomicUsize,
247
248 // Handle to the receiver's task.
249 recv_task: Mutex<ReceiverTask>,
250}
251
252// Struct representation of `Inner::state`.
253#[derive(Debug, Clone, Copy)]
254struct State {
255 // `true` when the channel is open
256 is_open: bool,
257
258 // Number of messages in the channel
259 num_messages: usize,
260}
261
262#[derive(Debug)]
263struct ReceiverTask {
264 unparked: bool,
265 task: Option<Task>,
266}
267
268// Returned from Receiver::try_park()
269enum TryPark {
270 Parked,
271 Closed,
272 NotEmpty,
273}
274
275// The `is_open` flag is stored in the left-most bit of `Inner::state`
276const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
277
278// When a new channel is created, it is created in the open state with no
279// pending messages.
280const INIT_STATE: usize = OPEN_MASK;
281
282// The maximum number of messages that a channel can track is `usize::MAX >> 1`
283const MAX_CAPACITY: usize = !(OPEN_MASK);
284
285// The maximum requested buffer size must be less than the maximum capacity of
286// a channel. This is because each sender gets a guaranteed slot.
287const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
288
289// Sent to the consumer to wake up blocked producers
290#[derive(Debug)]
291struct SenderTask {
292 task: Option<Task>,
293 is_parked: bool,
294}
295
296impl SenderTask {
297 fn new() -> Self {
298 SenderTask {
299 task: None,
300 is_parked: false,
301 }
302 }
303
304 fn notify(&mut self) {
305 self.is_parked = false;
306
307 if let Some(task) = self.task.take() {
308 task.notify();
309 }
310 }
311}
312
313/// Creates an in-memory channel implementation of the `Stream` trait with
314/// bounded capacity.
315///
316/// This method creates a concrete implementation of the `Stream` trait which
317/// can be used to send values across threads in a streaming fashion. This
318/// channel is unique in that it implements back pressure to ensure that the
319/// sender never outpaces the receiver. The channel capacity is equal to
320/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot
321/// in the channel capacity, and on top of that there are `buffer` "first come,
322/// first serve" slots available to all senders.
323///
324/// The `Receiver` returned implements the `Stream` trait and has access to any
325/// number of the associated combinators for transforming the result.
326pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
327 // Check that the requested buffer size does not exceed the maximum buffer
328 // size permitted by the system.
329 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
330 channel2(Some(buffer))
331}
332
333/// Creates an in-memory channel implementation of the `Stream` trait with
334/// unbounded capacity.
335///
336/// This method creates a concrete implementation of the `Stream` trait which
337/// can be used to send values across threads in a streaming fashion. A `send`
338/// on this channel will always succeed as long as the receive half has not
339/// been closed. If the receiver falls behind, messages will be buffered
340/// internally.
341///
342/// **Note** that the amount of available system memory is an implicit bound to
343/// the channel. Using an `unbounded` channel has the ability of causing the
344/// process to run out of memory. In this case, the process will be aborted.
345pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
346 let (tx, rx) = channel2(None);
347 (UnboundedSender(tx), rx)
348}
349
350fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
351 let inner = Arc::new(Inner {
352 buffer: buffer,
353 state: AtomicUsize::new(INIT_STATE),
354 message_queue: Queue::new(),
355 parked_queue: Queue::new(),
356 num_senders: AtomicUsize::new(1),
357 recv_task: Mutex::new(ReceiverTask {
358 unparked: false,
359 task: None,
360 }),
361 });
362
363 let tx = Sender {
364 inner: inner.clone(),
365 sender_task: Arc::new(Mutex::new(SenderTask::new())),
366 maybe_parked: false,
367 };
368
369 let rx = Receiver {
370 inner: inner,
371 };
372
373 (tx, rx)
374}
375
376/*
377 *
378 * ===== impl Sender =====
379 *
380 */
381
382impl<T> Sender<T> {
383 /// Attempts to send a message on this `Sender<T>` without blocking.
384 ///
385 /// This function, unlike `start_send`, is safe to call whether it's being
386 /// called on a task or not. Note that this function, however, will *not*
387 /// attempt to block the current task if the message cannot be sent.
388 ///
389 /// It is not recommended to call this function from inside of a future,
390 /// only from an external thread where you've otherwise arranged to be
391 /// notified when the channel is no longer full.
392 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
393 // If the sender is currently blocked, reject the message
394 if !self.poll_unparked(false).is_ready() {
395 return Err(TrySendError {
396 kind: TrySendErrorKind::Full(msg),
397 });
398 }
399
400 // The channel has capacity to accept the message, so send it
401 self.do_send(Some(msg), false)
402 .map_err(|SendError(v)| {
403 TrySendError {
404 kind: TrySendErrorKind::Disconnected(v),
405 }
406 })
407 }
408
409 // Do the send without failing
410 // None means close
411 fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
412 // First, increment the number of messages contained by the channel.
413 // This operation will also atomically determine if the sender task
414 // should be parked.
415 //
416 // None is returned in the case that the channel has been closed by the
417 // receiver. This happens when `Receiver::close` is called or the
418 // receiver is dropped.
419 let park_self = match self.inc_num_messages(msg.is_none()) {
420 Some(park_self) => park_self,
421 None => {
422 // The receiver has closed the channel. Only abort if actually
423 // sending a message. It is important that the stream
424 // termination (None) is always sent. This technically means
425 // that it is possible for the queue to contain the following
426 // number of messages:
427 //
428 // num-senders + buffer + 1
429 //
430 if let Some(msg) = msg {
431 return Err(SendError(msg));
432 } else {
433 return Ok(());
434 }
435 }
436 };
437
438 // If the channel has reached capacity, then the sender task needs to
439 // be parked. This will send the task handle on the parked task queue.
440 //
441 // However, when `do_send` is called while dropping the `Sender`,
442 // `task::current()` can't be called safely. In this case, in order to
443 // maintain internal consistency, a blank message is pushed onto the
444 // parked task queue.
445 if park_self {
446 self.park(do_park);
447 }
448
449 self.queue_push_and_signal(msg);
450
451 Ok(())
452 }
453
454 // Do the send without parking current task.
455 //
456 // To be called from unbounded sender.
457 fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> {
458 match self.inc_num_messages(false) {
459 Some(park_self) => assert!(!park_self),
460 None => return Err(SendError(msg)),
461 };
462
463 self.queue_push_and_signal(Some(msg));
464
465 Ok(())
466 }
467
468 // Push message to the queue and signal to the receiver
469 fn queue_push_and_signal(&self, msg: Option<T>) {
470 // Push the message onto the message queue
471 self.inner.message_queue.push(msg);
472
473 // Signal to the receiver that a message has been enqueued. If the
474 // receiver is parked, this will unpark the task.
475 self.signal();
476 }
477
478 // Increment the number of queued messages. Returns if the sender should
479 // block.
480 fn inc_num_messages(&self, close: bool) -> Option<bool> {
481 let mut curr = self.inner.state.load(SeqCst);
482
483 loop {
484 let mut state = decode_state(curr);
485
486 // The receiver end closed the channel.
487 if !state.is_open {
488 return None;
489 }
490
491 // This probably is never hit? Odds are the process will run out of
492 // memory first. It may be worth to return something else in this
493 // case?
494 assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \
495 sending this messages would overflow the state");
496
497 state.num_messages += 1;
498
499 // The channel is closed by all sender handles being dropped.
500 if close {
501 state.is_open = false;
502 }
503
504 let next = encode_state(&state);
505 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
506 Ok(_) => {
507 // Block if the current number of pending messages has exceeded
508 // the configured buffer size
509 let park_self = match self.inner.buffer {
510 Some(buffer) => state.num_messages > buffer,
511 None => false,
512 };
513
514 return Some(park_self)
515 }
516 Err(actual) => curr = actual,
517 }
518 }
519 }
520
521 // Signal to the receiver task that a message has been enqueued
522 fn signal(&self) {
523 // TODO
524 // This logic can probably be improved by guarding the lock with an
525 // atomic.
526 //
527 // Do this step first so that the lock is dropped when
528 // `unpark` is called
529 let task = {
530 let mut recv_task = self.inner.recv_task.lock().unwrap();
531
532 // If the receiver has already been unparked, then there is nothing
533 // more to do
534 if recv_task.unparked {
535 return;
536 }
537
538 // Setting this flag enables the receiving end to detect that
539 // an unpark event happened in order to avoid unnecessarily
540 // parking.
541 recv_task.unparked = true;
542 recv_task.task.take()
543 };
544
545 if let Some(task) = task {
546 task.notify();
547 }
548 }
549
550 fn park(&mut self, can_park: bool) {
551 // TODO: clean up internal state if the task::current will fail
552
553 let task = if can_park {
554 Some(task::current())
555 } else {
556 None
557 };
558
559 {
560 let mut sender = self.sender_task.lock().unwrap();
561 sender.task = task;
562 sender.is_parked = true;
563 }
564
565 // Send handle over queue
566 let t = self.sender_task.clone();
567 self.inner.parked_queue.push(t);
568
569 // Check to make sure we weren't closed after we sent our task on the
570 // queue
571 let state = decode_state(self.inner.state.load(SeqCst));
572 self.maybe_parked = state.is_open;
573 }
574
575 /// Polls the channel to determine if there is guaranteed to be capacity to send at least one
576 /// item without waiting.
577 ///
578 /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
579 /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns
580 /// `Err(SendError(_))` if the receiver has been dropped.
581 ///
582 /// # Panics
583 ///
584 /// This method will panic if called from outside the context of a task or future.
585 pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
586 let state = decode_state(self.inner.state.load(SeqCst));
587 if !state.is_open {
588 return Err(SendError(()));
589 }
590
591 Ok(self.poll_unparked(true))
592 }
593
594 fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
595 // First check the `maybe_parked` variable. This avoids acquiring the
596 // lock in most cases
597 if self.maybe_parked {
598 // Get a lock on the task handle
599 let mut task = self.sender_task.lock().unwrap();
600
601 if !task.is_parked {
602 self.maybe_parked = false;
603 return Async::Ready(())
604 }
605
606 // At this point, an unpark request is pending, so there will be an
607 // unpark sometime in the future. We just need to make sure that
608 // the correct task will be notified.
609 //
610 // Update the task in case the `Sender` has been moved to another
611 // task
612 task.task = if do_park {
613 Some(task::current())
614 } else {
615 None
616 };
617
618 Async::NotReady
619 } else {
620 Async::Ready(())
621 }
622 }
623}
624
625impl<T> Sink for Sender<T> {
626 type SinkItem = T;
627 type SinkError = SendError<T>;
628
629 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
630 // If the sender is currently blocked, reject the message before doing
631 // any work.
632 if !self.poll_unparked(true).is_ready() {
633 return Ok(AsyncSink::NotReady(msg));
634 }
635
636 // The channel has capacity to accept the message, so send it.
637 self.do_send(Some(msg), true)?;
638
639 Ok(AsyncSink::Ready)
640 }
641
642 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
643 Ok(Async::Ready(()))
644 }
645
646 fn close(&mut self) -> Poll<(), SendError<T>> {
647 Ok(Async::Ready(()))
648 }
649}
650
651impl<T> UnboundedSender<T> {
652 /// Sends the provided message along this channel.
653 ///
654 /// This is an unbounded sender, so this function differs from `Sink::send`
655 /// by ensuring the return type reflects that the channel is always ready to
656 /// receive messages.
657 #[deprecated(note = "renamed to `unbounded_send`")]
658 #[doc(hidden)]
659 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
660 self.unbounded_send(msg)
661 }
662
663 /// Sends the provided message along this channel.
664 ///
665 /// This is an unbounded sender, so this function differs from `Sink::send`
666 /// by ensuring the return type reflects that the channel is always ready to
667 /// receive messages.
668 pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> {
669 self.0.do_send_nb(msg)
670 }
671}
672
673impl<T> Sink for UnboundedSender<T> {
674 type SinkItem = T;
675 type SinkError = SendError<T>;
676
677 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
678 self.0.start_send(msg)
679 }
680
681 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
682 self.0.poll_complete()
683 }
684
685 fn close(&mut self) -> Poll<(), SendError<T>> {
686 Ok(Async::Ready(()))
687 }
688}
689
690impl<'a, T> Sink for &'a UnboundedSender<T> {
691 type SinkItem = T;
692 type SinkError = SendError<T>;
693
694 fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> {
695 self.0.do_send_nb(msg)?;
696 Ok(AsyncSink::Ready)
697 }
698
699 fn poll_complete(&mut self) -> Poll<(), SendError<T>> {
700 Ok(Async::Ready(()))
701 }
702
703 fn close(&mut self) -> Poll<(), SendError<T>> {
704 Ok(Async::Ready(()))
705 }
706}
707
708impl<T> Clone for UnboundedSender<T> {
709 fn clone(&self) -> UnboundedSender<T> {
710 UnboundedSender(self.0.clone())
711 }
712}
713
714
715impl<T> Clone for Sender<T> {
716 fn clone(&self) -> Sender<T> {
717 // Since this atomic op isn't actually guarding any memory and we don't
718 // care about any orderings besides the ordering on the single atomic
719 // variable, a relaxed ordering is acceptable.
720 let mut curr = self.inner.num_senders.load(SeqCst);
721
722 loop {
723 // If the maximum number of senders has been reached, then fail
724 if curr == self.inner.max_senders() {
725 panic!("cannot clone `Sender` -- too many outstanding senders");
726 }
727
728 debug_assert!(curr < self.inner.max_senders());
729
730 let next = curr + 1;
731 let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
732
733 // The ABA problem doesn't matter here. We only care that the
734 // number of senders never exceeds the maximum.
735 if actual == curr {
736 return Sender {
737 inner: self.inner.clone(),
738 sender_task: Arc::new(Mutex::new(SenderTask::new())),
739 maybe_parked: false,
740 };
741 }
742
743 curr = actual;
744 }
745 }
746}
747
748impl<T> Drop for Sender<T> {
749 fn drop(&mut self) {
750 // Ordering between variables don't matter here
751 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
752
753 if prev == 1 {
754 let _ = self.do_send(None, false);
755 }
756 }
757}
758
759/*
760 *
761 * ===== impl Receiver =====
762 *
763 */
764
765impl<T> Receiver<T> {
766 /// Closes the receiving half
767 ///
768 /// This prevents any further messages from being sent on the channel while
769 /// still enabling the receiver to drain messages that are buffered.
770 pub fn close(&mut self) {
771 let mut curr = self.inner.state.load(SeqCst);
772
773 loop {
774 let mut state = decode_state(curr);
775
776 if !state.is_open {
777 break
778 }
779
780 state.is_open = false;
781
782 let next = encode_state(&state);
783 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
784 Ok(_) => break,
785 Err(actual) => curr = actual,
786 }
787 }
788
789 // Wake up any threads waiting as they'll see that we've closed the
790 // channel and will continue on their merry way.
791 loop {
792 match unsafe { self.inner.parked_queue.pop() } {
793 PopResult::Data(task) => {
794 task.lock().unwrap().notify();
795 }
796 PopResult::Empty => break,
797 PopResult::Inconsistent => thread::yield_now(),
798 }
799 }
800 }
801
802 fn next_message(&mut self) -> Async<Option<T>> {
803 // Pop off a message
804 loop {
805 match unsafe { self.inner.message_queue.pop() } {
806 PopResult::Data(msg) => {
807 return Async::Ready(msg);
808 }
809 PopResult::Empty => {
810 // The queue is empty, return NotReady
811 return Async::NotReady;
812 }
813 PopResult::Inconsistent => {
814 // Inconsistent means that there will be a message to pop
815 // in a short time. This branch can only be reached if
816 // values are being produced from another thread, so there
817 // are a few ways that we can deal with this:
818 //
819 // 1) Spin
820 // 2) thread::yield_now()
821 // 3) task::current().unwrap() & return NotReady
822 //
823 // For now, thread::yield_now() is used, but it would
824 // probably be better to spin a few times then yield.
825 thread::yield_now();
826 }
827 }
828 }
829 }
830
831 // Unpark a single task handle if there is one pending in the parked queue
832 fn unpark_one(&mut self) {
833 loop {
834 match unsafe { self.inner.parked_queue.pop() } {
835 PopResult::Data(task) => {
836 task.lock().unwrap().notify();
837 return;
838 }
839 PopResult::Empty => {
840 // Queue empty, no task to wake up.
841 return;
842 }
843 PopResult::Inconsistent => {
844 // Same as above
845 thread::yield_now();
846 }
847 }
848 }
849 }
850
851 // Try to park the receiver task
852 fn try_park(&self) -> TryPark {
853 let curr = self.inner.state.load(SeqCst);
854 let state = decode_state(curr);
855
856 // If the channel is closed, then there is no need to park.
857 if !state.is_open && state.num_messages == 0 {
858 return TryPark::Closed;
859 }
860
861 // First, track the task in the `recv_task` slot
862 let mut recv_task = self.inner.recv_task.lock().unwrap();
863
864 if recv_task.unparked {
865 // Consume the `unpark` signal without actually parking
866 recv_task.unparked = false;
867 return TryPark::NotEmpty;
868 }
869
870 recv_task.task = Some(task::current());
871 TryPark::Parked
872 }
873
874 fn dec_num_messages(&self) {
875 let mut curr = self.inner.state.load(SeqCst);
876
877 loop {
878 let mut state = decode_state(curr);
879
880 state.num_messages -= 1;
881
882 let next = encode_state(&state);
883 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
884 Ok(_) => break,
885 Err(actual) => curr = actual,
886 }
887 }
888 }
889}
890
891impl<T> Stream for Receiver<T> {
892 type Item = T;
893 type Error = ();
894
895 fn poll(&mut self) -> Poll<Option<T>, ()> {
896 loop {
897 // Try to read a message off of the message queue.
898 let msg = match self.next_message() {
899 Async::Ready(msg) => msg,
900 Async::NotReady => {
901 // There are no messages to read, in this case, attempt to
902 // park. The act of parking will verify that the channel is
903 // still empty after the park operation has completed.
904 match self.try_park() {
905 TryPark::Parked => {
906 // The task was parked, and the channel is still
907 // empty, return NotReady.
908 return Ok(Async::NotReady);
909 }
910 TryPark::Closed => {
911 // The channel is closed, there will be no further
912 // messages.
913 return Ok(Async::Ready(None));
914 }
915 TryPark::NotEmpty => {
916 // A message has been sent while attempting to
917 // park. Loop again, the next iteration is
918 // guaranteed to get the message.
919 continue;
920 }
921 }
922 }
923 };
924
925 // If there are any parked task handles in the parked queue, pop
926 // one and unpark it.
927 self.unpark_one();
928
929 // Decrement number of messages
930 self.dec_num_messages();
931
932 // Return the message
933 return Ok(Async::Ready(msg));
934 }
935 }
936}
937
938impl<T> Drop for Receiver<T> {
939 fn drop(&mut self) {
940 // Drain the channel of all pending messages
941 self.close();
942 while self.next_message().is_ready() {
943 // ...
944 }
945 }
946}
947
948/*
949 *
950 * ===== impl Inner =====
951 *
952 */
953
954impl<T> Inner<T> {
955 // The return value is such that the total number of messages that can be
956 // enqueued into the channel will never exceed MAX_CAPACITY
957 fn max_senders(&self) -> usize {
958 match self.buffer {
959 Some(buffer) => MAX_CAPACITY - buffer,
960 None => MAX_BUFFER,
961 }
962 }
963}
964
965unsafe impl<T: Send> Send for Inner<T> {}
966unsafe impl<T: Send> Sync for Inner<T> {}
967
968/*
969 *
970 * ===== Helpers =====
971 *
972 */
973
974fn decode_state(num: usize) -> State {
975 State {
976 is_open: num & OPEN_MASK == OPEN_MASK,
977 num_messages: num & MAX_CAPACITY,
978 }
979}
980
981fn encode_state(state: &State) -> usize {
982 let mut num = state.num_messages;
983
984 if state.is_open {
985 num |= OPEN_MASK;
986 }
987
988 num
989}