Skip to main content

futures_channel/mpsc/
mod.rs

1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use core::future::Future;
82use futures_core::stream::{FusedStream, Stream};
83use futures_core::task::__internal::AtomicWaker;
84use futures_core::task::{Context, Poll, Waker};
85use futures_core::FusedFuture;
86use std::fmt;
87use std::pin::Pin;
88use std::sync::atomic::AtomicUsize;
89use std::sync::atomic::Ordering::SeqCst;
90use std::sync::{Arc, Mutex};
91use std::thread;
92
93use crate::mpsc::queue::Queue;
94
95mod queue;
96#[cfg(feature = "sink")]
97mod sink_impl;
98
99struct UnboundedSenderInner<T> {
100    // Channel state shared between the sender and receiver.
101    inner: Arc<UnboundedInner<T>>,
102}
103
104struct BoundedSenderInner<T> {
105    // Channel state shared between the sender and receiver.
106    inner: Arc<BoundedInner<T>>,
107
108    // Handle to the task that is blocked on this sender. This handle is sent
109    // to the receiver half in order to be notified when the sender becomes
110    // unblocked.
111    sender_task: Arc<Mutex<SenderTask>>,
112
113    // `true` if the sender might be blocked. This is an optimization to avoid
114    // having to lock the mutex most of the time.
115    maybe_parked: bool,
116}
117
118// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119impl<T> Unpin for UnboundedSenderInner<T> {}
120impl<T> Unpin for BoundedSenderInner<T> {}
121
122/// The transmission end of a bounded mpsc channel.
123///
124/// This value is created by the [`channel`] function.
125pub struct Sender<T>(Option<BoundedSenderInner<T>>);
126
127/// The transmission end of an unbounded mpsc channel.
128///
129/// This value is created by the [`unbounded`] function.
130pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
131
132#[allow(dead_code)]
133trait AssertKinds: Send + Sync + Clone {}
134impl AssertKinds for UnboundedSender<u32> {}
135
136/// The receiving end of a bounded mpsc channel.
137///
138/// This value is created by the [`channel`] function.
139pub struct Receiver<T> {
140    inner: Option<Arc<BoundedInner<T>>>,
141}
142
143/// The receiving end of an unbounded mpsc channel.
144///
145/// This value is created by the [`unbounded`] function.
146pub struct UnboundedReceiver<T> {
147    inner: Option<Arc<UnboundedInner<T>>>,
148}
149
150// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
151impl<T> Unpin for UnboundedReceiver<T> {}
152
153/// The error type for [`Sender`s](Sender) used as `Sink`s.
154#[derive(Clone, Debug, PartialEq, Eq)]
155pub struct SendError {
156    kind: SendErrorKind,
157}
158
159/// The error type returned from [`try_send`](Sender::try_send).
160#[derive(Clone, PartialEq, Eq)]
161pub struct TrySendError<T> {
162    err: SendError,
163    val: T,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
167enum SendErrorKind {
168    Full,
169    Disconnected,
170}
171
172/// Error returned by [`Receiver::try_recv`] or [`UnboundedReceiver::try_recv`].
173#[derive(PartialEq, Eq, Clone, Copy, Debug)]
174pub enum TryRecvError {
175    /// The channel is empty but not closed.
176    Empty,
177
178    /// The channel is empty and closed.
179    Closed,
180}
181
182/// Error returned by the future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
183/// Received when the channel is empty and closed.
184#[derive(PartialEq, Eq, Clone, Copy, Debug)]
185pub struct RecvError;
186
187/// Future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
188#[derive(Debug)]
189#[must_use = "futures do nothing unless you `.await` or poll them"]
190pub struct Recv<'a, St: ?Sized> {
191    stream: &'a mut St,
192}
193
194impl fmt::Display for SendError {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        if self.is_full() {
197            write!(f, "send failed because channel is full")
198        } else {
199            write!(f, "send failed because receiver is gone")
200        }
201    }
202}
203
204impl std::error::Error for SendError {}
205
206impl fmt::Display for RecvError {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        write!(f, "receive failed because channel is empty and closed")
209    }
210}
211
212impl std::error::Error for RecvError {}
213
214impl SendError {
215    /// Returns `true` if this error is a result of the channel being full.
216    pub fn is_full(&self) -> bool {
217        match self.kind {
218            SendErrorKind::Full => true,
219            _ => false,
220        }
221    }
222
223    /// Returns `true` if this error is a result of the receiver being dropped.
224    pub fn is_disconnected(&self) -> bool {
225        match self.kind {
226            SendErrorKind::Disconnected => true,
227            _ => false,
228        }
229    }
230}
231
232impl TryRecvError {
233    /// Returns `true` if the channel is empty but not closed.
234    pub fn is_empty(&self) -> bool {
235        matches!(self, TryRecvError::Empty)
236    }
237
238    /// Returns `true` if the channel is empty and closed.
239    pub fn is_closed(&self) -> bool {
240        matches!(self, TryRecvError::Closed)
241    }
242}
243
244impl<T> fmt::Debug for TrySendError<T> {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
247    }
248}
249
250impl<T> fmt::Display for TrySendError<T> {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        if self.is_full() {
253            write!(f, "send failed because channel is full")
254        } else {
255            write!(f, "send failed because receiver is gone")
256        }
257    }
258}
259
260impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
261
262impl<T> TrySendError<T> {
263    /// Returns `true` if this error is a result of the channel being full.
264    pub fn is_full(&self) -> bool {
265        self.err.is_full()
266    }
267
268    /// Returns `true` if this error is a result of the receiver being dropped.
269    pub fn is_disconnected(&self) -> bool {
270        self.err.is_disconnected()
271    }
272
273    /// Returns the message that was attempted to be sent but failed.
274    pub fn into_inner(self) -> T {
275        self.val
276    }
277
278    /// Drops the message and converts into a `SendError`.
279    pub fn into_send_error(self) -> SendError {
280        self.err
281    }
282}
283
284impl fmt::Display for TryRecvError {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        match self {
287            TryRecvError::Empty => write!(f, "receive failed because channel is empty"),
288            TryRecvError::Closed => write!(f, "receive failed because channel is closed"),
289        }
290    }
291}
292
293impl std::error::Error for TryRecvError {}
294
295struct UnboundedInner<T> {
296    // Internal channel state. Consists of the number of messages stored in the
297    // channel as well as a flag signalling that the channel is closed.
298    state: AtomicUsize,
299
300    // Atomic, FIFO queue used to send messages to the receiver
301    message_queue: Queue<T>,
302
303    // Number of senders in existence
304    num_senders: AtomicUsize,
305
306    // Handle to the receiver's task.
307    recv_task: AtomicWaker,
308}
309
310struct BoundedInner<T> {
311    // Max buffer size of the channel. If `None` then the channel is unbounded.
312    buffer: usize,
313
314    // Internal channel state. Consists of the number of messages stored in the
315    // channel as well as a flag signalling that the channel is closed.
316    state: AtomicUsize,
317
318    // Atomic, FIFO queue used to send messages to the receiver
319    message_queue: Queue<T>,
320
321    // Atomic, FIFO queue used to send parked task handles to the receiver.
322    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
323
324    // Number of senders in existence
325    num_senders: AtomicUsize,
326
327    // Handle to the receiver's task.
328    recv_task: AtomicWaker,
329}
330
331// Struct representation of `Inner::state`.
332#[derive(Clone, Copy)]
333struct State {
334    // `true` when the channel is open
335    is_open: bool,
336
337    // Number of messages in the channel
338    num_messages: usize,
339}
340
341// The `is_open` flag is stored in the left-most bit of `Inner::state`
342const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
343
344// When a new channel is created, it is created in the open state with no
345// pending messages.
346const INIT_STATE: usize = OPEN_MASK;
347
348// The maximum number of messages that a channel can track is `usize::MAX >> 1`
349const MAX_CAPACITY: usize = !(OPEN_MASK);
350
351// The maximum requested buffer size must be less than the maximum capacity of
352// a channel. This is because each sender gets a guaranteed slot.
353const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
354
355// Sent to the consumer to wake up blocked producers
356struct SenderTask {
357    task: Option<Waker>,
358    is_parked: bool,
359}
360
361impl SenderTask {
362    fn new() -> Self {
363        Self { task: None, is_parked: false }
364    }
365
366    fn notify(&mut self) {
367        self.is_parked = false;
368
369        if let Some(task) = self.task.take() {
370            task.wake();
371        }
372    }
373}
374
375/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
376///
377/// Being bounded, this channel provides backpressure to ensure that the sender
378/// outpaces the receiver by only a limited amount. The channel's capacity is
379/// equal to `buffer + num-senders`. In other words, each sender gets a
380/// guaranteed slot in the channel capacity, and on top of that there are
381/// `buffer` "first come, first serve" slots available to all senders.
382///
383/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
384/// implements `Sink`.
385pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
386    // Check that the requested buffer size does not exceed the maximum buffer
387    // size permitted by the system.
388    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
389
390    let inner = Arc::new(BoundedInner {
391        buffer,
392        state: AtomicUsize::new(INIT_STATE),
393        message_queue: Queue::new(),
394        parked_queue: Queue::new(),
395        num_senders: AtomicUsize::new(1),
396        recv_task: AtomicWaker::new(),
397    });
398
399    let tx = BoundedSenderInner {
400        inner: inner.clone(),
401        sender_task: Arc::new(Mutex::new(SenderTask::new())),
402        maybe_parked: false,
403    };
404
405    let rx = Receiver { inner: Some(inner) };
406
407    (Sender(Some(tx)), rx)
408}
409
410/// Creates an unbounded mpsc channel for communicating between asynchronous
411/// tasks.
412///
413/// A `send` on this channel will always succeed as long as the receive half has
414/// not been closed. If the receiver falls behind, messages will be arbitrarily
415/// buffered.
416///
417/// **Note** that the amount of available system memory is an implicit bound to
418/// the channel. Using an `unbounded` channel has the ability of causing the
419/// process to run out of memory. In this case, the process will be aborted.
420pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
421    let inner = Arc::new(UnboundedInner {
422        state: AtomicUsize::new(INIT_STATE),
423        message_queue: Queue::new(),
424        num_senders: AtomicUsize::new(1),
425        recv_task: AtomicWaker::new(),
426    });
427
428    let tx = UnboundedSenderInner { inner: inner.clone() };
429
430    let rx = UnboundedReceiver { inner: Some(inner) };
431
432    (UnboundedSender(Some(tx)), rx)
433}
434
435/*
436 *
437 * ===== impl Sender =====
438 *
439 */
440
441impl<T> UnboundedSenderInner<T> {
442    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
443        let state = decode_state(self.inner.state.load(SeqCst));
444        if state.is_open {
445            Poll::Ready(Ok(()))
446        } else {
447            Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
448        }
449    }
450
451    // Push message to the queue and signal to the receiver
452    fn queue_push_and_signal(&self, msg: T) {
453        // Push the message onto the message queue
454        self.inner.message_queue.push(msg);
455
456        // Signal to the receiver that a message has been enqueued. If the
457        // receiver is parked, this will unpark the task.
458        self.inner.recv_task.wake();
459    }
460
461    // Increment the number of queued messages. Returns the resulting number.
462    fn inc_num_messages(&self) -> Option<usize> {
463        let mut curr = self.inner.state.load(SeqCst);
464
465        loop {
466            let mut state = decode_state(curr);
467
468            // The receiver end closed the channel.
469            if !state.is_open {
470                return None;
471            }
472
473            // This probably is never hit? Odds are the process will run out of
474            // memory first. It may be worth to return something else in this
475            // case?
476            assert!(
477                state.num_messages < MAX_CAPACITY,
478                "buffer space \
479                    exhausted; sending this messages would overflow the state"
480            );
481
482            state.num_messages += 1;
483
484            let next = encode_state(&state);
485            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
486                Ok(_) => return Some(state.num_messages),
487                Err(actual) => curr = actual,
488            }
489        }
490    }
491
492    /// Returns whether the senders send to the same receiver.
493    fn same_receiver(&self, other: &Self) -> bool {
494        Arc::ptr_eq(&self.inner, &other.inner)
495    }
496
497    /// Returns whether the sender send to this receiver.
498    fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
499        Arc::ptr_eq(&self.inner, inner)
500    }
501
502    /// Returns pointer to the Arc containing sender
503    ///
504    /// The returned pointer is not referenced and should be only used for hashing!
505    fn ptr(&self) -> *const UnboundedInner<T> {
506        &*self.inner
507    }
508
509    /// Returns whether this channel is closed without needing a context.
510    fn is_closed(&self) -> bool {
511        !decode_state(self.inner.state.load(SeqCst)).is_open
512    }
513
514    /// Closes this channel from the sender side, preventing any new messages.
515    fn close_channel(&self) {
516        // There's no need to park this sender, its dropping,
517        // and we don't want to check for capacity, so skip
518        // that stuff from `do_send`.
519
520        self.inner.set_closed();
521        self.inner.recv_task.wake();
522    }
523}
524
525impl<T> BoundedSenderInner<T> {
526    /// Attempts to send a message on this `Sender`, returning the message
527    /// if there was an error.
528    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
529        // If the sender is currently blocked, reject the message
530        if !self.poll_unparked(None).is_ready() {
531            return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
532        }
533
534        // The channel has capacity to accept the message, so send it
535        self.do_send_b(msg)
536    }
537
538    // Do the send without failing.
539    // Can be called only by bounded sender.
540    fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
541        // Anyone calling do_send *should* make sure there is room first,
542        // but assert here for tests as a sanity check.
543        debug_assert!(self.poll_unparked(None).is_ready());
544
545        // First, increment the number of messages contained by the channel.
546        // This operation will also atomically determine if the sender task
547        // should be parked.
548        //
549        // `None` is returned in the case that the channel has been closed by the
550        // receiver. This happens when `Receiver::close` is called or the
551        // receiver is dropped.
552        let park_self = match self.inc_num_messages() {
553            Some(num_messages) => {
554                // Block if the current number of pending messages has exceeded
555                // the configured buffer size
556                num_messages > self.inner.buffer
557            }
558            None => {
559                return Err(TrySendError {
560                    err: SendError { kind: SendErrorKind::Disconnected },
561                    val: msg,
562                })
563            }
564        };
565
566        // If the channel has reached capacity, then the sender task needs to
567        // be parked. This will send the task handle on the parked task queue.
568        //
569        // However, when `do_send` is called while dropping the `Sender`,
570        // `task::current()` can't be called safely. In this case, in order to
571        // maintain internal consistency, a blank message is pushed onto the
572        // parked task queue.
573        if park_self {
574            self.park();
575        }
576
577        self.queue_push_and_signal(msg);
578
579        Ok(())
580    }
581
582    // Push message to the queue and signal to the receiver
583    fn queue_push_and_signal(&self, msg: T) {
584        // Push the message onto the message queue
585        self.inner.message_queue.push(msg);
586
587        // Signal to the receiver that a message has been enqueued. If the
588        // receiver is parked, this will unpark the task.
589        self.inner.recv_task.wake();
590    }
591
592    // Increment the number of queued messages. Returns the resulting number.
593    fn inc_num_messages(&self) -> Option<usize> {
594        let mut curr = self.inner.state.load(SeqCst);
595
596        loop {
597            let mut state = decode_state(curr);
598
599            // The receiver end closed the channel.
600            if !state.is_open {
601                return None;
602            }
603
604            // This probably is never hit? Odds are the process will run out of
605            // memory first. It may be worth to return something else in this
606            // case?
607            assert!(
608                state.num_messages < MAX_CAPACITY,
609                "buffer space \
610                    exhausted; sending this messages would overflow the state"
611            );
612
613            state.num_messages += 1;
614
615            let next = encode_state(&state);
616            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
617                Ok(_) => return Some(state.num_messages),
618                Err(actual) => curr = actual,
619            }
620        }
621    }
622
623    fn park(&mut self) {
624        {
625            let mut sender = self.sender_task.lock().unwrap();
626            sender.task = None;
627            sender.is_parked = true;
628        }
629
630        // Send handle over queue
631        let t = self.sender_task.clone();
632        self.inner.parked_queue.push(t);
633
634        // Check to make sure we weren't closed after we sent our task on the
635        // queue
636        let state = decode_state(self.inner.state.load(SeqCst));
637        self.maybe_parked = state.is_open;
638    }
639
640    /// Polls the channel to determine if there is guaranteed capacity to send
641    /// at least one item without waiting.
642    ///
643    /// # Return value
644    ///
645    /// This method returns:
646    ///
647    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
648    /// - `Poll::Pending` if the channel may not have
649    ///   capacity, in which case the current task is queued to be notified once
650    ///   capacity is available;
651    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
652    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
653        let state = decode_state(self.inner.state.load(SeqCst));
654        if !state.is_open {
655            return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
656        }
657
658        self.poll_unparked(Some(cx)).map(Ok)
659    }
660
661    /// Returns whether the senders send to the same receiver.
662    fn same_receiver(&self, other: &Self) -> bool {
663        Arc::ptr_eq(&self.inner, &other.inner)
664    }
665
666    /// Returns whether the sender send to this receiver.
667    fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
668        Arc::ptr_eq(&self.inner, receiver)
669    }
670
671    /// Returns pointer to the Arc containing sender
672    ///
673    /// The returned pointer is not referenced and should be only used for hashing!
674    fn ptr(&self) -> *const BoundedInner<T> {
675        &*self.inner
676    }
677
678    /// Returns whether this channel is closed without needing a context.
679    fn is_closed(&self) -> bool {
680        !decode_state(self.inner.state.load(SeqCst)).is_open
681    }
682
683    /// Closes this channel from the sender side, preventing any new messages.
684    fn close_channel(&self) {
685        // There's no need to park this sender, its dropping,
686        // and we don't want to check for capacity, so skip
687        // that stuff from `do_send`.
688
689        self.inner.set_closed();
690        self.inner.recv_task.wake();
691    }
692
693    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
694        // First check the `maybe_parked` variable. This avoids acquiring the
695        // lock in most cases
696        if self.maybe_parked {
697            // Get a lock on the task handle
698            let mut task = self.sender_task.lock().unwrap();
699
700            if !task.is_parked {
701                self.maybe_parked = false;
702                return Poll::Ready(());
703            }
704
705            // At this point, an unpark request is pending, so there will be an
706            // unpark sometime in the future. We just need to make sure that
707            // the correct task will be notified.
708            //
709            // Update the task in case the `Sender` has been moved to another
710            // task
711            task.task = cx.map(|cx| cx.waker().clone());
712
713            Poll::Pending
714        } else {
715            Poll::Ready(())
716        }
717    }
718}
719
720impl<T> Sender<T> {
721    /// Attempts to send a message on this `Sender`, returning the message
722    /// if there was an error.
723    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
724        if let Some(inner) = &mut self.0 {
725            inner.try_send(msg)
726        } else {
727            Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
728        }
729    }
730
731    /// Send a message on the channel.
732    ///
733    /// This function should only be called after
734    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
735    /// ready to receive a message.
736    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
737        self.try_send(msg).map_err(|e| e.err)
738    }
739
740    /// Polls the channel to determine if there is guaranteed capacity to send
741    /// at least one item without waiting.
742    ///
743    /// # Return value
744    ///
745    /// This method returns:
746    ///
747    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
748    /// - `Poll::Pending` if the channel may not have
749    ///   capacity, in which case the current task is queued to be notified once
750    ///   capacity is available;
751    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
752    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
753        let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
754        inner.poll_ready(cx)
755    }
756
757    /// Returns whether this channel is closed without needing a context.
758    pub fn is_closed(&self) -> bool {
759        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
760    }
761
762    /// Closes this channel from the sender side, preventing any new messages.
763    pub fn close_channel(&mut self) {
764        if let Some(inner) = &mut self.0 {
765            inner.close_channel();
766        }
767    }
768
769    /// Disconnects this sender from the channel, closing it if there are no more senders left.
770    pub fn disconnect(&mut self) {
771        self.0 = None;
772    }
773
774    /// Returns whether the senders send to the same receiver.
775    pub fn same_receiver(&self, other: &Self) -> bool {
776        match (&self.0, &other.0) {
777            (Some(inner), Some(other)) => inner.same_receiver(other),
778            _ => false,
779        }
780    }
781
782    /// Returns whether the sender send to this receiver.
783    pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
784        match (&self.0, &receiver.inner) {
785            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
786            _ => false,
787        }
788    }
789
790    /// Hashes the receiver into the provided hasher
791    pub fn hash_receiver<H>(&self, hasher: &mut H)
792    where
793        H: std::hash::Hasher,
794    {
795        use std::hash::Hash;
796
797        let ptr = self.0.as_ref().map(|inner| inner.ptr());
798        ptr.hash(hasher);
799    }
800}
801
802impl<T> UnboundedSender<T> {
803    /// Check if the channel is ready to receive a message.
804    pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
805        let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
806        inner.poll_ready_nb()
807    }
808
809    /// Returns whether this channel is closed without needing a context.
810    pub fn is_closed(&self) -> bool {
811        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
812    }
813
814    /// Closes this channel from the sender side, preventing any new messages.
815    pub fn close_channel(&self) {
816        if let Some(inner) = &self.0 {
817            inner.close_channel();
818        }
819    }
820
821    /// Disconnects this sender from the channel, closing it if there are no more senders left.
822    pub fn disconnect(&mut self) {
823        self.0 = None;
824    }
825
826    // Do the send without parking current task.
827    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
828        if let Some(inner) = &self.0 {
829            if inner.inc_num_messages().is_some() {
830                inner.queue_push_and_signal(msg);
831                return Ok(());
832            }
833        }
834
835        Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
836    }
837
838    /// Send a message on the channel.
839    ///
840    /// This method should only be called after `poll_ready` has been used to
841    /// verify that the channel is ready to receive a message.
842    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
843        self.do_send_nb(msg).map_err(|e| e.err)
844    }
845
846    /// Sends a message along this channel.
847    ///
848    /// This is an unbounded sender, so this function differs from `Sink::send`
849    /// by ensuring the return type reflects that the channel is always ready to
850    /// receive messages.
851    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
852        self.do_send_nb(msg)
853    }
854
855    /// Returns whether the senders send to the same receiver.
856    pub fn same_receiver(&self, other: &Self) -> bool {
857        match (&self.0, &other.0) {
858            (Some(inner), Some(other)) => inner.same_receiver(other),
859            _ => false,
860        }
861    }
862
863    /// Returns whether the sender send to this receiver.
864    pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
865        match (&self.0, &receiver.inner) {
866            (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
867            _ => false,
868        }
869    }
870
871    /// Hashes the receiver into the provided hasher
872    pub fn hash_receiver<H>(&self, hasher: &mut H)
873    where
874        H: std::hash::Hasher,
875    {
876        use std::hash::Hash;
877
878        let ptr = self.0.as_ref().map(|inner| inner.ptr());
879        ptr.hash(hasher);
880    }
881
882    /// Return the number of messages in the queue or 0 if channel is disconnected.
883    pub fn len(&self) -> usize {
884        if let Some(sender) = &self.0 {
885            decode_state(sender.inner.state.load(SeqCst)).num_messages
886        } else {
887            0
888        }
889    }
890
891    /// Return false is channel has no queued messages, true otherwise.
892    pub fn is_empty(&self) -> bool {
893        self.len() == 0
894    }
895}
896
897impl<T> Clone for Sender<T> {
898    fn clone(&self) -> Self {
899        Self(self.0.clone())
900    }
901}
902
903impl<T> Clone for UnboundedSender<T> {
904    fn clone(&self) -> Self {
905        Self(self.0.clone())
906    }
907}
908
909impl<T> Clone for UnboundedSenderInner<T> {
910    fn clone(&self) -> Self {
911        // Since this atomic op isn't actually guarding any memory and we don't
912        // care about any orderings besides the ordering on the single atomic
913        // variable, a relaxed ordering is acceptable.
914        let mut curr = self.inner.num_senders.load(SeqCst);
915
916        loop {
917            // If the maximum number of senders has been reached, then fail
918            if curr == MAX_BUFFER {
919                panic!("cannot clone `Sender` -- too many outstanding senders");
920            }
921
922            debug_assert!(curr < MAX_BUFFER);
923
924            let next = curr + 1;
925            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
926                Ok(_) => {
927                    // The ABA problem doesn't matter here. We only care that the
928                    // number of senders never exceeds the maximum.
929                    return Self { inner: self.inner.clone() };
930                }
931                Err(actual) => curr = actual,
932            }
933        }
934    }
935}
936
937impl<T> Clone for BoundedSenderInner<T> {
938    fn clone(&self) -> Self {
939        // Since this atomic op isn't actually guarding any memory and we don't
940        // care about any orderings besides the ordering on the single atomic
941        // variable, a relaxed ordering is acceptable.
942        let mut curr = self.inner.num_senders.load(SeqCst);
943
944        loop {
945            // If the maximum number of senders has been reached, then fail
946            if curr == self.inner.max_senders() {
947                panic!("cannot clone `Sender` -- too many outstanding senders");
948            }
949
950            debug_assert!(curr < self.inner.max_senders());
951
952            let next = curr + 1;
953            match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
954                Ok(_) => {
955                    // The ABA problem doesn't matter here. We only care that the
956                    // number of senders never exceeds the maximum.
957                    return Self {
958                        inner: self.inner.clone(),
959                        sender_task: Arc::new(Mutex::new(SenderTask::new())),
960                        maybe_parked: false,
961                    };
962                }
963                Err(actual) => curr = actual,
964            }
965        }
966    }
967}
968
969impl<T> Drop for UnboundedSenderInner<T> {
970    fn drop(&mut self) {
971        // Ordering between variables don't matter here
972        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
973
974        if prev == 1 {
975            self.close_channel();
976        }
977    }
978}
979
980impl<T> Drop for BoundedSenderInner<T> {
981    fn drop(&mut self) {
982        // Ordering between variables don't matter here
983        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
984
985        if prev == 1 {
986            self.close_channel();
987        }
988    }
989}
990
991impl<T> fmt::Debug for Sender<T> {
992    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
993        f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
994    }
995}
996
997impl<T> fmt::Debug for UnboundedSender<T> {
998    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
999        f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
1000    }
1001}
1002
1003/*
1004 *
1005 * ===== impl Receiver =====
1006 *
1007 */
1008
1009impl<T> Receiver<T> {
1010    /// Waits for a message from the channel.
1011    /// If the channel is empty and closed, returns [`RecvError`].
1012    pub fn recv(&mut self) -> Recv<'_, Self> {
1013        Recv::new(self)
1014    }
1015
1016    /// Closes the receiving half of a channel, without dropping it.
1017    ///
1018    /// This prevents any further messages from being sent on the channel while
1019    /// still enabling the receiver to drain messages that are buffered.
1020    pub fn close(&mut self) {
1021        if let Some(inner) = &mut self.inner {
1022            inner.set_closed();
1023
1024            // Wake up any threads waiting as they'll see that we've closed the
1025            // channel and will continue on their merry way.
1026            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1027                task.lock().unwrap().notify();
1028            }
1029        }
1030    }
1031
1032    /// Tries to receive the next message without notifying a context if empty.
1033    ///
1034    /// It is not recommended to call this function from inside of a future,
1035    /// only when you've otherwise arranged to be notified when the channel is
1036    /// no longer empty.
1037    ///
1038    /// This function returns:
1039    /// * `Ok(Some(t))` when message is fetched
1040    /// * `Ok(None)` when channel is closed and no messages left in the queue
1041    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1042    #[deprecated(note = "please use `try_recv` instead")]
1043    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1044        match self.next_message() {
1045            Poll::Ready(msg) => Ok(msg),
1046            Poll::Pending => Err(TryRecvError::Empty),
1047        }
1048    }
1049
1050    /// Tries to receive a message from the channel without blocking.
1051    /// If the channel is empty, or empty and closed, this method returns an error.
1052    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1053        match self.next_message() {
1054            Poll::Ready(Some(msg)) => Ok(msg),
1055            Poll::Ready(None) => Err(TryRecvError::Closed),
1056            Poll::Pending => Err(TryRecvError::Empty),
1057        }
1058    }
1059
1060    fn next_message(&mut self) -> Poll<Option<T>> {
1061        let inner = match self.inner.as_mut() {
1062            None => return Poll::Ready(None),
1063            Some(inner) => inner,
1064        };
1065        // Pop off a message
1066        match unsafe { inner.message_queue.pop_spin() } {
1067            Some(msg) => {
1068                // If there are any parked task handles in the parked queue,
1069                // pop one and unpark it.
1070                self.unpark_one();
1071
1072                // Decrement number of messages
1073                self.dec_num_messages();
1074
1075                Poll::Ready(Some(msg))
1076            }
1077            None => {
1078                let state = decode_state(inner.state.load(SeqCst));
1079                if state.is_closed() {
1080                    // If closed flag is set AND there are no pending messages
1081                    // it means end of stream
1082                    self.inner = None;
1083                    Poll::Ready(None)
1084                } else {
1085                    // If queue is open, we need to return Pending
1086                    // to be woken up when new messages arrive.
1087                    // If queue is closed but num_messages is non-zero,
1088                    // it means that senders updated the state,
1089                    // but didn't put message to queue yet,
1090                    // so we need to park until sender unparks the task
1091                    // after queueing the message.
1092                    Poll::Pending
1093                }
1094            }
1095        }
1096    }
1097
1098    // Unpark a single task handle if there is one pending in the parked queue
1099    fn unpark_one(&mut self) {
1100        if let Some(inner) = &mut self.inner {
1101            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1102                task.lock().unwrap().notify();
1103            }
1104        }
1105    }
1106
1107    fn dec_num_messages(&self) {
1108        if let Some(inner) = &self.inner {
1109            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1110            // unless there's underflow, and we know there's no underflow
1111            // because number of messages at this point is always > 0.
1112            inner.state.fetch_sub(1, SeqCst);
1113        }
1114    }
1115}
1116
1117// The receiver does not ever take a Pin to the inner T
1118impl<T> Unpin for Receiver<T> {}
1119
1120impl<T> FusedStream for Receiver<T> {
1121    fn is_terminated(&self) -> bool {
1122        self.inner.is_none()
1123    }
1124}
1125
1126impl<T> Stream for Receiver<T> {
1127    type Item = T;
1128
1129    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1130        // Try to read a message off of the message queue.
1131        match self.next_message() {
1132            Poll::Ready(msg) => {
1133                if msg.is_none() {
1134                    self.inner = None;
1135                }
1136                Poll::Ready(msg)
1137            }
1138            Poll::Pending => {
1139                // There are no messages to read, in this case, park.
1140                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1141                // Check queue again after parking to prevent race condition:
1142                // a message could be added to the queue after previous `next_message`
1143                // before `register` call.
1144                self.next_message()
1145            }
1146        }
1147    }
1148
1149    fn size_hint(&self) -> (usize, Option<usize>) {
1150        if let Some(inner) = &self.inner {
1151            decode_state(inner.state.load(SeqCst)).size_hint()
1152        } else {
1153            (0, Some(0))
1154        }
1155    }
1156}
1157
1158impl<St: ?Sized + Unpin> Unpin for Recv<'_, St> {}
1159impl<'a, St: ?Sized + Stream + Unpin> Recv<'a, St> {
1160    fn new(stream: &'a mut St) -> Self {
1161        Self { stream }
1162    }
1163}
1164
1165impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Recv<'_, St> {
1166    fn is_terminated(&self) -> bool {
1167        self.stream.is_terminated()
1168    }
1169}
1170
1171impl<St: ?Sized + Stream + Unpin> Future for Recv<'_, St> {
1172    type Output = Result<St::Item, RecvError>;
1173
1174    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1175        match Pin::new(&mut self.stream).poll_next(cx) {
1176            Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
1177            Poll::Ready(None) => Poll::Ready(Err(RecvError)),
1178            Poll::Pending => Poll::Pending,
1179        }
1180    }
1181}
1182
1183impl<T> Drop for Receiver<T> {
1184    fn drop(&mut self) {
1185        // Drain the channel of all pending messages
1186        self.close();
1187        if self.inner.is_some() {
1188            loop {
1189                match self.next_message() {
1190                    Poll::Ready(Some(_)) => {}
1191                    Poll::Ready(None) => break,
1192                    Poll::Pending => {
1193                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1194
1195                        // If the channel is closed, then there is no need to park.
1196                        if state.is_closed() {
1197                            break;
1198                        }
1199
1200                        // TODO: Spinning isn't ideal, it might be worth
1201                        // investigating using a condvar or some other strategy
1202                        // here. That said, if this case is hit, then another thread
1203                        // is about to push the value into the queue and this isn't
1204                        // the only spinlock in the impl right now.
1205                        thread::yield_now();
1206                    }
1207                }
1208            }
1209        }
1210    }
1211}
1212
1213impl<T> fmt::Debug for Receiver<T> {
1214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1215        let closed = if let Some(ref inner) = self.inner {
1216            decode_state(inner.state.load(SeqCst)).is_closed()
1217        } else {
1218            false
1219        };
1220
1221        f.debug_struct("Receiver").field("closed", &closed).finish()
1222    }
1223}
1224
1225impl<T> UnboundedReceiver<T> {
1226    /// Waits for a message from the channel.
1227    /// If the channel is empty and closed, returns [`RecvError`].
1228    pub fn recv(&mut self) -> Recv<'_, Self> {
1229        Recv::new(self)
1230    }
1231
1232    /// Closes the receiving half of a channel, without dropping it.
1233    ///
1234    /// This prevents any further messages from being sent on the channel while
1235    /// still enabling the receiver to drain messages that are buffered.
1236    pub fn close(&mut self) {
1237        if let Some(inner) = &mut self.inner {
1238            inner.set_closed();
1239        }
1240    }
1241
1242    /// Tries to receive the next message without notifying a context if empty.
1243    ///
1244    /// It is not recommended to call this function from inside of a future,
1245    /// only when you've otherwise arranged to be notified when the channel is
1246    /// no longer empty.
1247    ///
1248    /// This function returns:
1249    /// * `Ok(Some(t))` when message is fetched
1250    /// * `Ok(None)` when channel is closed and no messages left in the queue
1251    /// * `Err(e)` when there are no messages available, but channel is not yet closed
1252    #[deprecated(note = "please use `try_recv` instead")]
1253    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1254        match self.next_message() {
1255            Poll::Ready(msg) => Ok(msg),
1256            Poll::Pending => Err(TryRecvError::Empty),
1257        }
1258    }
1259
1260    /// Tries to receive a message from the channel without blocking.
1261    /// If the channel is empty, or empty and closed, this method returns an error.
1262    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1263        match self.next_message() {
1264            Poll::Ready(Some(msg)) => Ok(msg),
1265            Poll::Ready(None) => Err(TryRecvError::Closed),
1266            Poll::Pending => Err(TryRecvError::Empty),
1267        }
1268    }
1269
1270    fn next_message(&mut self) -> Poll<Option<T>> {
1271        let inner = match self.inner.as_mut() {
1272            None => return Poll::Ready(None),
1273            Some(inner) => inner,
1274        };
1275        // Pop off a message
1276        match unsafe { inner.message_queue.pop_spin() } {
1277            Some(msg) => {
1278                // Decrement number of messages
1279                self.dec_num_messages();
1280
1281                Poll::Ready(Some(msg))
1282            }
1283            None => {
1284                let state = decode_state(inner.state.load(SeqCst));
1285                if state.is_closed() {
1286                    // If closed flag is set AND there are no pending messages
1287                    // it means end of stream
1288                    self.inner = None;
1289                    Poll::Ready(None)
1290                } else {
1291                    // If queue is open, we need to return Pending
1292                    // to be woken up when new messages arrive.
1293                    // If queue is closed but num_messages is non-zero,
1294                    // it means that senders updated the state,
1295                    // but didn't put message to queue yet,
1296                    // so we need to park until sender unparks the task
1297                    // after queueing the message.
1298                    Poll::Pending
1299                }
1300            }
1301        }
1302    }
1303
1304    fn dec_num_messages(&self) {
1305        if let Some(inner) = &self.inner {
1306            // OPEN_MASK is highest bit, so it's unaffected by subtraction
1307            // unless there's underflow, and we know there's no underflow
1308            // because number of messages at this point is always > 0.
1309            inner.state.fetch_sub(1, SeqCst);
1310        }
1311    }
1312}
1313
1314impl<T> FusedStream for UnboundedReceiver<T> {
1315    fn is_terminated(&self) -> bool {
1316        self.inner.is_none()
1317    }
1318}
1319
1320impl<T> Stream for UnboundedReceiver<T> {
1321    type Item = T;
1322
1323    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1324        // Try to read a message off of the message queue.
1325        match self.next_message() {
1326            Poll::Ready(msg) => {
1327                if msg.is_none() {
1328                    self.inner = None;
1329                }
1330                Poll::Ready(msg)
1331            }
1332            Poll::Pending => {
1333                // There are no messages to read, in this case, park.
1334                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1335                // Check queue again after parking to prevent race condition:
1336                // a message could be added to the queue after previous `next_message`
1337                // before `register` call.
1338                self.next_message()
1339            }
1340        }
1341    }
1342
1343    fn size_hint(&self) -> (usize, Option<usize>) {
1344        if let Some(inner) = &self.inner {
1345            decode_state(inner.state.load(SeqCst)).size_hint()
1346        } else {
1347            (0, Some(0))
1348        }
1349    }
1350}
1351
1352impl<T> Drop for UnboundedReceiver<T> {
1353    fn drop(&mut self) {
1354        // Drain the channel of all pending messages
1355        self.close();
1356        if self.inner.is_some() {
1357            loop {
1358                match self.next_message() {
1359                    Poll::Ready(Some(_)) => {}
1360                    Poll::Ready(None) => break,
1361                    Poll::Pending => {
1362                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1363
1364                        // If the channel is closed, then there is no need to park.
1365                        if state.is_closed() {
1366                            break;
1367                        }
1368
1369                        // TODO: Spinning isn't ideal, it might be worth
1370                        // investigating using a condvar or some other strategy
1371                        // here. That said, if this case is hit, then another thread
1372                        // is about to push the value into the queue and this isn't
1373                        // the only spinlock in the impl right now.
1374                        thread::yield_now();
1375                    }
1376                }
1377            }
1378        }
1379    }
1380}
1381
1382impl<T> fmt::Debug for UnboundedReceiver<T> {
1383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1384        let closed = if let Some(ref inner) = self.inner {
1385            decode_state(inner.state.load(SeqCst)).is_closed()
1386        } else {
1387            false
1388        };
1389
1390        f.debug_struct("Receiver").field("closed", &closed).finish()
1391    }
1392}
1393
1394/*
1395 *
1396 * ===== impl Inner =====
1397 *
1398 */
1399
1400impl<T> UnboundedInner<T> {
1401    // Clear `open` flag in the state, keep `num_messages` intact.
1402    fn set_closed(&self) {
1403        let curr = self.state.load(SeqCst);
1404        if !decode_state(curr).is_open {
1405            return;
1406        }
1407
1408        self.state.fetch_and(!OPEN_MASK, SeqCst);
1409    }
1410}
1411
1412impl<T> BoundedInner<T> {
1413    // The return value is such that the total number of messages that can be
1414    // enqueued into the channel will never exceed MAX_CAPACITY
1415    fn max_senders(&self) -> usize {
1416        MAX_CAPACITY - self.buffer
1417    }
1418
1419    // Clear `open` flag in the state, keep `num_messages` intact.
1420    fn set_closed(&self) {
1421        let curr = self.state.load(SeqCst);
1422        if !decode_state(curr).is_open {
1423            return;
1424        }
1425
1426        self.state.fetch_and(!OPEN_MASK, SeqCst);
1427    }
1428}
1429
1430unsafe impl<T: Send> Send for UnboundedInner<T> {}
1431unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1432
1433unsafe impl<T: Send> Send for BoundedInner<T> {}
1434unsafe impl<T: Send> Sync for BoundedInner<T> {}
1435
1436impl State {
1437    fn is_closed(&self) -> bool {
1438        !self.is_open && self.num_messages == 0
1439    }
1440
1441    fn size_hint(&self) -> (usize, Option<usize>) {
1442        if self.is_open {
1443            (self.num_messages, None)
1444        } else {
1445            (self.num_messages, Some(self.num_messages))
1446        }
1447    }
1448}
1449
1450/*
1451 *
1452 * ===== Helpers =====
1453 *
1454 */
1455
1456fn decode_state(num: usize) -> State {
1457    State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1458}
1459
1460fn encode_state(state: &State) -> usize {
1461    let mut num = state.num_messages;
1462
1463    if state.is_open {
1464        num |= OPEN_MASK;
1465    }
1466
1467    num
1468}