futures_channel/mpsc/mod.rs
1//! A multi-producer, single-consumer queue for sending values across
2//! asynchronous tasks.
3//!
4//! Similarly to the `std`, channel creation provides [`Receiver`] and
5//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
6//! read values out of the channel. If there is no message to read from the
7//! channel, the current task will be notified when a new value is sent.
8//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
9//! the channel. If the channel is at capacity, the send will be rejected and
10//! the task will be notified when additional capacity is available. In other
11//! words, the channel provides backpressure.
12//!
13//! Unbounded channels are also available using the `unbounded` constructor.
14//!
15//! # Disconnection
16//!
17//! When all [`Sender`] handles have been dropped, it is no longer
18//! possible to send values into the channel. This is considered the termination
19//! event of the stream. As such, [`Receiver::poll_next`]
20//! will return `Ok(Ready(None))`.
21//!
22//! If the [`Receiver`] handle is dropped, then messages can no longer
23//! be read out of the channel. In this case, all further attempts to send will
24//! result in an error.
25//!
26//! # Clean Shutdown
27//!
28//! If the [`Receiver`] is simply dropped, then it is possible for
29//! there to be messages still in the channel that will not be processed. As
30//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
31//! receiver will first call `close`, which will prevent any further messages to
32//! be sent into the channel. Then, the receiver consumes the channel to
33//! completion, at which point the receiver can be dropped.
34//!
35//! [`Sender`]: struct.Sender.html
36//! [`Receiver`]: struct.Receiver.html
37//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
38//! [`Receiver::poll_next`]:
39//! ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
40
41// At the core, the channel uses an atomic FIFO queue for message passing. This
42// queue is used as the primary coordination primitive. In order to enforce
43// capacity limits and handle back pressure, a secondary FIFO queue is used to
44// send parked task handles.
45//
46// The general idea is that the channel is created with a `buffer` size of `n`.
47// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
48// slot to hold a message. This allows `Sender` to know for a fact that a send
49// will succeed *before* starting to do the actual work of sending the value.
50// Since most of this work is lock-free, once the work starts, it is impossible
51// to safely revert.
52//
53// If the sender is unable to process a send operation, then the current
54// task is parked and the handle is sent on the parked task queue.
55//
56// Note that the implementation guarantees that the channel capacity will never
57// exceed the configured limit, however there is no *strict* guarantee that the
58// receiver will wake up a parked task *immediately* when a slot becomes
59// available. However, it will almost always unpark a task when a slot becomes
60// available and it is *guaranteed* that a sender will be unparked when the
61// message that caused the sender to become parked is read out of the channel.
62//
63// The steps for sending a message are roughly:
64//
65// 1) Increment the channel message count
66// 2) If the channel is at capacity, push the task handle onto the wait queue
67// 3) Push the message onto the message queue.
68//
69// The steps for receiving a message are roughly:
70//
71// 1) Pop a message from the message queue
72// 2) Pop a task handle from the wait queue
73// 3) Decrement the channel message count.
74//
75// It's important for the order of operations on lock-free structures to happen
76// in reverse order between the sender and receiver. This makes the message
77// queue the primary coordination structure and establishes the necessary
78// happens-before semantics required for the acquire / release semantics used
79// by the queue structure.
80
81use core::future::Future;
82use futures_core::stream::{FusedStream, Stream};
83use futures_core::task::__internal::AtomicWaker;
84use futures_core::task::{Context, Poll, Waker};
85use futures_core::FusedFuture;
86use std::fmt;
87use std::pin::Pin;
88use std::sync::atomic::AtomicUsize;
89use std::sync::atomic::Ordering::SeqCst;
90use std::sync::{Arc, Mutex};
91use std::thread;
92
93use crate::mpsc::queue::Queue;
94
95mod queue;
96#[cfg(feature = "sink")]
97mod sink_impl;
98
99struct UnboundedSenderInner<T> {
100 // Channel state shared between the sender and receiver.
101 inner: Arc<UnboundedInner<T>>,
102}
103
104struct BoundedSenderInner<T> {
105 // Channel state shared between the sender and receiver.
106 inner: Arc<BoundedInner<T>>,
107
108 // Handle to the task that is blocked on this sender. This handle is sent
109 // to the receiver half in order to be notified when the sender becomes
110 // unblocked.
111 sender_task: Arc<Mutex<SenderTask>>,
112
113 // `true` if the sender might be blocked. This is an optimization to avoid
114 // having to lock the mutex most of the time.
115 maybe_parked: bool,
116}
117
118// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
119impl<T> Unpin for UnboundedSenderInner<T> {}
120impl<T> Unpin for BoundedSenderInner<T> {}
121
122/// The transmission end of a bounded mpsc channel.
123///
124/// This value is created by the [`channel`] function.
125pub struct Sender<T>(Option<BoundedSenderInner<T>>);
126
127/// The transmission end of an unbounded mpsc channel.
128///
129/// This value is created by the [`unbounded`] function.
130pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
131
132#[allow(dead_code)]
133trait AssertKinds: Send + Sync + Clone {}
134impl AssertKinds for UnboundedSender<u32> {}
135
136/// The receiving end of a bounded mpsc channel.
137///
138/// This value is created by the [`channel`] function.
139pub struct Receiver<T> {
140 inner: Option<Arc<BoundedInner<T>>>,
141}
142
143/// The receiving end of an unbounded mpsc channel.
144///
145/// This value is created by the [`unbounded`] function.
146pub struct UnboundedReceiver<T> {
147 inner: Option<Arc<UnboundedInner<T>>>,
148}
149
150// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
151impl<T> Unpin for UnboundedReceiver<T> {}
152
153/// The error type for [`Sender`s](Sender) used as `Sink`s.
154#[derive(Clone, Debug, PartialEq, Eq)]
155pub struct SendError {
156 kind: SendErrorKind,
157}
158
159/// The error type returned from [`try_send`](Sender::try_send).
160#[derive(Clone, PartialEq, Eq)]
161pub struct TrySendError<T> {
162 err: SendError,
163 val: T,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
167enum SendErrorKind {
168 Full,
169 Disconnected,
170}
171
172/// Error returned by [`Receiver::try_recv`] or [`UnboundedReceiver::try_recv`].
173#[derive(PartialEq, Eq, Clone, Copy, Debug)]
174pub enum TryRecvError {
175 /// The channel is empty but not closed.
176 Empty,
177
178 /// The channel is empty and closed.
179 Closed,
180}
181
182/// Error returned by the future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
183/// Received when the channel is empty and closed.
184#[derive(PartialEq, Eq, Clone, Copy, Debug)]
185pub struct RecvError;
186
187/// Future returned by [`Receiver::recv()`] or [`UnboundedReceiver::recv()`].
188#[derive(Debug)]
189#[must_use = "futures do nothing unless you `.await` or poll them"]
190pub struct Recv<'a, St: ?Sized> {
191 stream: &'a mut St,
192}
193
194impl fmt::Display for SendError {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 if self.is_full() {
197 write!(f, "send failed because channel is full")
198 } else {
199 write!(f, "send failed because receiver is gone")
200 }
201 }
202}
203
204impl std::error::Error for SendError {}
205
206impl fmt::Display for RecvError {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 write!(f, "receive failed because channel is empty and closed")
209 }
210}
211
212impl std::error::Error for RecvError {}
213
214impl SendError {
215 /// Returns `true` if this error is a result of the channel being full.
216 pub fn is_full(&self) -> bool {
217 match self.kind {
218 SendErrorKind::Full => true,
219 _ => false,
220 }
221 }
222
223 /// Returns `true` if this error is a result of the receiver being dropped.
224 pub fn is_disconnected(&self) -> bool {
225 match self.kind {
226 SendErrorKind::Disconnected => true,
227 _ => false,
228 }
229 }
230}
231
232impl TryRecvError {
233 /// Returns `true` if the channel is empty but not closed.
234 pub fn is_empty(&self) -> bool {
235 matches!(self, TryRecvError::Empty)
236 }
237
238 /// Returns `true` if the channel is empty and closed.
239 pub fn is_closed(&self) -> bool {
240 matches!(self, TryRecvError::Closed)
241 }
242}
243
244impl<T> fmt::Debug for TrySendError<T> {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 f.debug_struct("TrySendError").field("kind", &self.err.kind).finish()
247 }
248}
249
250impl<T> fmt::Display for TrySendError<T> {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 if self.is_full() {
253 write!(f, "send failed because channel is full")
254 } else {
255 write!(f, "send failed because receiver is gone")
256 }
257 }
258}
259
260impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
261
262impl<T> TrySendError<T> {
263 /// Returns `true` if this error is a result of the channel being full.
264 pub fn is_full(&self) -> bool {
265 self.err.is_full()
266 }
267
268 /// Returns `true` if this error is a result of the receiver being dropped.
269 pub fn is_disconnected(&self) -> bool {
270 self.err.is_disconnected()
271 }
272
273 /// Returns the message that was attempted to be sent but failed.
274 pub fn into_inner(self) -> T {
275 self.val
276 }
277
278 /// Drops the message and converts into a `SendError`.
279 pub fn into_send_error(self) -> SendError {
280 self.err
281 }
282}
283
284impl fmt::Display for TryRecvError {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 match self {
287 TryRecvError::Empty => write!(f, "receive failed because channel is empty"),
288 TryRecvError::Closed => write!(f, "receive failed because channel is closed"),
289 }
290 }
291}
292
293impl std::error::Error for TryRecvError {}
294
295struct UnboundedInner<T> {
296 // Internal channel state. Consists of the number of messages stored in the
297 // channel as well as a flag signalling that the channel is closed.
298 state: AtomicUsize,
299
300 // Atomic, FIFO queue used to send messages to the receiver
301 message_queue: Queue<T>,
302
303 // Number of senders in existence
304 num_senders: AtomicUsize,
305
306 // Handle to the receiver's task.
307 recv_task: AtomicWaker,
308}
309
310struct BoundedInner<T> {
311 // Max buffer size of the channel. If `None` then the channel is unbounded.
312 buffer: usize,
313
314 // Internal channel state. Consists of the number of messages stored in the
315 // channel as well as a flag signalling that the channel is closed.
316 state: AtomicUsize,
317
318 // Atomic, FIFO queue used to send messages to the receiver
319 message_queue: Queue<T>,
320
321 // Atomic, FIFO queue used to send parked task handles to the receiver.
322 parked_queue: Queue<Arc<Mutex<SenderTask>>>,
323
324 // Number of senders in existence
325 num_senders: AtomicUsize,
326
327 // Handle to the receiver's task.
328 recv_task: AtomicWaker,
329}
330
331// Struct representation of `Inner::state`.
332#[derive(Clone, Copy)]
333struct State {
334 // `true` when the channel is open
335 is_open: bool,
336
337 // Number of messages in the channel
338 num_messages: usize,
339}
340
341// The `is_open` flag is stored in the left-most bit of `Inner::state`
342const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1);
343
344// When a new channel is created, it is created in the open state with no
345// pending messages.
346const INIT_STATE: usize = OPEN_MASK;
347
348// The maximum number of messages that a channel can track is `usize::MAX >> 1`
349const MAX_CAPACITY: usize = !(OPEN_MASK);
350
351// The maximum requested buffer size must be less than the maximum capacity of
352// a channel. This is because each sender gets a guaranteed slot.
353const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
354
355// Sent to the consumer to wake up blocked producers
356struct SenderTask {
357 task: Option<Waker>,
358 is_parked: bool,
359}
360
361impl SenderTask {
362 fn new() -> Self {
363 Self { task: None, is_parked: false }
364 }
365
366 fn notify(&mut self) {
367 self.is_parked = false;
368
369 if let Some(task) = self.task.take() {
370 task.wake();
371 }
372 }
373}
374
375/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
376///
377/// Being bounded, this channel provides backpressure to ensure that the sender
378/// outpaces the receiver by only a limited amount. The channel's capacity is
379/// equal to `buffer + num-senders`. In other words, each sender gets a
380/// guaranteed slot in the channel capacity, and on top of that there are
381/// `buffer` "first come, first serve" slots available to all senders.
382///
383/// The [`Receiver`] returned implements the [`Stream`] trait, while [`Sender`]
384/// implements `Sink`.
385pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
386 // Check that the requested buffer size does not exceed the maximum buffer
387 // size permitted by the system.
388 assert!(buffer < MAX_BUFFER, "requested buffer size too large");
389
390 let inner = Arc::new(BoundedInner {
391 buffer,
392 state: AtomicUsize::new(INIT_STATE),
393 message_queue: Queue::new(),
394 parked_queue: Queue::new(),
395 num_senders: AtomicUsize::new(1),
396 recv_task: AtomicWaker::new(),
397 });
398
399 let tx = BoundedSenderInner {
400 inner: inner.clone(),
401 sender_task: Arc::new(Mutex::new(SenderTask::new())),
402 maybe_parked: false,
403 };
404
405 let rx = Receiver { inner: Some(inner) };
406
407 (Sender(Some(tx)), rx)
408}
409
410/// Creates an unbounded mpsc channel for communicating between asynchronous
411/// tasks.
412///
413/// A `send` on this channel will always succeed as long as the receive half has
414/// not been closed. If the receiver falls behind, messages will be arbitrarily
415/// buffered.
416///
417/// **Note** that the amount of available system memory is an implicit bound to
418/// the channel. Using an `unbounded` channel has the ability of causing the
419/// process to run out of memory. In this case, the process will be aborted.
420pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
421 let inner = Arc::new(UnboundedInner {
422 state: AtomicUsize::new(INIT_STATE),
423 message_queue: Queue::new(),
424 num_senders: AtomicUsize::new(1),
425 recv_task: AtomicWaker::new(),
426 });
427
428 let tx = UnboundedSenderInner { inner: inner.clone() };
429
430 let rx = UnboundedReceiver { inner: Some(inner) };
431
432 (UnboundedSender(Some(tx)), rx)
433}
434
435/*
436 *
437 * ===== impl Sender =====
438 *
439 */
440
441impl<T> UnboundedSenderInner<T> {
442 fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
443 let state = decode_state(self.inner.state.load(SeqCst));
444 if state.is_open {
445 Poll::Ready(Ok(()))
446 } else {
447 Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }))
448 }
449 }
450
451 // Push message to the queue and signal to the receiver
452 fn queue_push_and_signal(&self, msg: T) {
453 // Push the message onto the message queue
454 self.inner.message_queue.push(msg);
455
456 // Signal to the receiver that a message has been enqueued. If the
457 // receiver is parked, this will unpark the task.
458 self.inner.recv_task.wake();
459 }
460
461 // Increment the number of queued messages. Returns the resulting number.
462 fn inc_num_messages(&self) -> Option<usize> {
463 let mut curr = self.inner.state.load(SeqCst);
464
465 loop {
466 let mut state = decode_state(curr);
467
468 // The receiver end closed the channel.
469 if !state.is_open {
470 return None;
471 }
472
473 // This probably is never hit? Odds are the process will run out of
474 // memory first. It may be worth to return something else in this
475 // case?
476 assert!(
477 state.num_messages < MAX_CAPACITY,
478 "buffer space \
479 exhausted; sending this messages would overflow the state"
480 );
481
482 state.num_messages += 1;
483
484 let next = encode_state(&state);
485 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
486 Ok(_) => return Some(state.num_messages),
487 Err(actual) => curr = actual,
488 }
489 }
490 }
491
492 /// Returns whether the senders send to the same receiver.
493 fn same_receiver(&self, other: &Self) -> bool {
494 Arc::ptr_eq(&self.inner, &other.inner)
495 }
496
497 /// Returns whether the sender send to this receiver.
498 fn is_connected_to(&self, inner: &Arc<UnboundedInner<T>>) -> bool {
499 Arc::ptr_eq(&self.inner, inner)
500 }
501
502 /// Returns pointer to the Arc containing sender
503 ///
504 /// The returned pointer is not referenced and should be only used for hashing!
505 fn ptr(&self) -> *const UnboundedInner<T> {
506 &*self.inner
507 }
508
509 /// Returns whether this channel is closed without needing a context.
510 fn is_closed(&self) -> bool {
511 !decode_state(self.inner.state.load(SeqCst)).is_open
512 }
513
514 /// Closes this channel from the sender side, preventing any new messages.
515 fn close_channel(&self) {
516 // There's no need to park this sender, its dropping,
517 // and we don't want to check for capacity, so skip
518 // that stuff from `do_send`.
519
520 self.inner.set_closed();
521 self.inner.recv_task.wake();
522 }
523}
524
525impl<T> BoundedSenderInner<T> {
526 /// Attempts to send a message on this `Sender`, returning the message
527 /// if there was an error.
528 fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
529 // If the sender is currently blocked, reject the message
530 if !self.poll_unparked(None).is_ready() {
531 return Err(TrySendError { err: SendError { kind: SendErrorKind::Full }, val: msg });
532 }
533
534 // The channel has capacity to accept the message, so send it
535 self.do_send_b(msg)
536 }
537
538 // Do the send without failing.
539 // Can be called only by bounded sender.
540 fn do_send_b(&mut self, msg: T) -> Result<(), TrySendError<T>> {
541 // Anyone calling do_send *should* make sure there is room first,
542 // but assert here for tests as a sanity check.
543 debug_assert!(self.poll_unparked(None).is_ready());
544
545 // First, increment the number of messages contained by the channel.
546 // This operation will also atomically determine if the sender task
547 // should be parked.
548 //
549 // `None` is returned in the case that the channel has been closed by the
550 // receiver. This happens when `Receiver::close` is called or the
551 // receiver is dropped.
552 let park_self = match self.inc_num_messages() {
553 Some(num_messages) => {
554 // Block if the current number of pending messages has exceeded
555 // the configured buffer size
556 num_messages > self.inner.buffer
557 }
558 None => {
559 return Err(TrySendError {
560 err: SendError { kind: SendErrorKind::Disconnected },
561 val: msg,
562 })
563 }
564 };
565
566 // If the channel has reached capacity, then the sender task needs to
567 // be parked. This will send the task handle on the parked task queue.
568 //
569 // However, when `do_send` is called while dropping the `Sender`,
570 // `task::current()` can't be called safely. In this case, in order to
571 // maintain internal consistency, a blank message is pushed onto the
572 // parked task queue.
573 if park_self {
574 self.park();
575 }
576
577 self.queue_push_and_signal(msg);
578
579 Ok(())
580 }
581
582 // Push message to the queue and signal to the receiver
583 fn queue_push_and_signal(&self, msg: T) {
584 // Push the message onto the message queue
585 self.inner.message_queue.push(msg);
586
587 // Signal to the receiver that a message has been enqueued. If the
588 // receiver is parked, this will unpark the task.
589 self.inner.recv_task.wake();
590 }
591
592 // Increment the number of queued messages. Returns the resulting number.
593 fn inc_num_messages(&self) -> Option<usize> {
594 let mut curr = self.inner.state.load(SeqCst);
595
596 loop {
597 let mut state = decode_state(curr);
598
599 // The receiver end closed the channel.
600 if !state.is_open {
601 return None;
602 }
603
604 // This probably is never hit? Odds are the process will run out of
605 // memory first. It may be worth to return something else in this
606 // case?
607 assert!(
608 state.num_messages < MAX_CAPACITY,
609 "buffer space \
610 exhausted; sending this messages would overflow the state"
611 );
612
613 state.num_messages += 1;
614
615 let next = encode_state(&state);
616 match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
617 Ok(_) => return Some(state.num_messages),
618 Err(actual) => curr = actual,
619 }
620 }
621 }
622
623 fn park(&mut self) {
624 {
625 let mut sender = self.sender_task.lock().unwrap();
626 sender.task = None;
627 sender.is_parked = true;
628 }
629
630 // Send handle over queue
631 let t = self.sender_task.clone();
632 self.inner.parked_queue.push(t);
633
634 // Check to make sure we weren't closed after we sent our task on the
635 // queue
636 let state = decode_state(self.inner.state.load(SeqCst));
637 self.maybe_parked = state.is_open;
638 }
639
640 /// Polls the channel to determine if there is guaranteed capacity to send
641 /// at least one item without waiting.
642 ///
643 /// # Return value
644 ///
645 /// This method returns:
646 ///
647 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
648 /// - `Poll::Pending` if the channel may not have
649 /// capacity, in which case the current task is queued to be notified once
650 /// capacity is available;
651 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
652 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
653 let state = decode_state(self.inner.state.load(SeqCst));
654 if !state.is_open {
655 return Poll::Ready(Err(SendError { kind: SendErrorKind::Disconnected }));
656 }
657
658 self.poll_unparked(Some(cx)).map(Ok)
659 }
660
661 /// Returns whether the senders send to the same receiver.
662 fn same_receiver(&self, other: &Self) -> bool {
663 Arc::ptr_eq(&self.inner, &other.inner)
664 }
665
666 /// Returns whether the sender send to this receiver.
667 fn is_connected_to(&self, receiver: &Arc<BoundedInner<T>>) -> bool {
668 Arc::ptr_eq(&self.inner, receiver)
669 }
670
671 /// Returns pointer to the Arc containing sender
672 ///
673 /// The returned pointer is not referenced and should be only used for hashing!
674 fn ptr(&self) -> *const BoundedInner<T> {
675 &*self.inner
676 }
677
678 /// Returns whether this channel is closed without needing a context.
679 fn is_closed(&self) -> bool {
680 !decode_state(self.inner.state.load(SeqCst)).is_open
681 }
682
683 /// Closes this channel from the sender side, preventing any new messages.
684 fn close_channel(&self) {
685 // There's no need to park this sender, its dropping,
686 // and we don't want to check for capacity, so skip
687 // that stuff from `do_send`.
688
689 self.inner.set_closed();
690 self.inner.recv_task.wake();
691 }
692
693 fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
694 // First check the `maybe_parked` variable. This avoids acquiring the
695 // lock in most cases
696 if self.maybe_parked {
697 // Get a lock on the task handle
698 let mut task = self.sender_task.lock().unwrap();
699
700 if !task.is_parked {
701 self.maybe_parked = false;
702 return Poll::Ready(());
703 }
704
705 // At this point, an unpark request is pending, so there will be an
706 // unpark sometime in the future. We just need to make sure that
707 // the correct task will be notified.
708 //
709 // Update the task in case the `Sender` has been moved to another
710 // task
711 task.task = cx.map(|cx| cx.waker().clone());
712
713 Poll::Pending
714 } else {
715 Poll::Ready(())
716 }
717 }
718}
719
720impl<T> Sender<T> {
721 /// Attempts to send a message on this `Sender`, returning the message
722 /// if there was an error.
723 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
724 if let Some(inner) = &mut self.0 {
725 inner.try_send(msg)
726 } else {
727 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
728 }
729 }
730
731 /// Send a message on the channel.
732 ///
733 /// This function should only be called after
734 /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
735 /// ready to receive a message.
736 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
737 self.try_send(msg).map_err(|e| e.err)
738 }
739
740 /// Polls the channel to determine if there is guaranteed capacity to send
741 /// at least one item without waiting.
742 ///
743 /// # Return value
744 ///
745 /// This method returns:
746 ///
747 /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
748 /// - `Poll::Pending` if the channel may not have
749 /// capacity, in which case the current task is queued to be notified once
750 /// capacity is available;
751 /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
752 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
753 let inner = self.0.as_mut().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
754 inner.poll_ready(cx)
755 }
756
757 /// Returns whether this channel is closed without needing a context.
758 pub fn is_closed(&self) -> bool {
759 self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
760 }
761
762 /// Closes this channel from the sender side, preventing any new messages.
763 pub fn close_channel(&mut self) {
764 if let Some(inner) = &mut self.0 {
765 inner.close_channel();
766 }
767 }
768
769 /// Disconnects this sender from the channel, closing it if there are no more senders left.
770 pub fn disconnect(&mut self) {
771 self.0 = None;
772 }
773
774 /// Returns whether the senders send to the same receiver.
775 pub fn same_receiver(&self, other: &Self) -> bool {
776 match (&self.0, &other.0) {
777 (Some(inner), Some(other)) => inner.same_receiver(other),
778 _ => false,
779 }
780 }
781
782 /// Returns whether the sender send to this receiver.
783 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
784 match (&self.0, &receiver.inner) {
785 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
786 _ => false,
787 }
788 }
789
790 /// Hashes the receiver into the provided hasher
791 pub fn hash_receiver<H>(&self, hasher: &mut H)
792 where
793 H: std::hash::Hasher,
794 {
795 use std::hash::Hash;
796
797 let ptr = self.0.as_ref().map(|inner| inner.ptr());
798 ptr.hash(hasher);
799 }
800}
801
802impl<T> UnboundedSender<T> {
803 /// Check if the channel is ready to receive a message.
804 pub fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), SendError>> {
805 let inner = self.0.as_ref().ok_or(SendError { kind: SendErrorKind::Disconnected })?;
806 inner.poll_ready_nb()
807 }
808
809 /// Returns whether this channel is closed without needing a context.
810 pub fn is_closed(&self) -> bool {
811 self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
812 }
813
814 /// Closes this channel from the sender side, preventing any new messages.
815 pub fn close_channel(&self) {
816 if let Some(inner) = &self.0 {
817 inner.close_channel();
818 }
819 }
820
821 /// Disconnects this sender from the channel, closing it if there are no more senders left.
822 pub fn disconnect(&mut self) {
823 self.0 = None;
824 }
825
826 // Do the send without parking current task.
827 fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
828 if let Some(inner) = &self.0 {
829 if inner.inc_num_messages().is_some() {
830 inner.queue_push_and_signal(msg);
831 return Ok(());
832 }
833 }
834
835 Err(TrySendError { err: SendError { kind: SendErrorKind::Disconnected }, val: msg })
836 }
837
838 /// Send a message on the channel.
839 ///
840 /// This method should only be called after `poll_ready` has been used to
841 /// verify that the channel is ready to receive a message.
842 pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
843 self.do_send_nb(msg).map_err(|e| e.err)
844 }
845
846 /// Sends a message along this channel.
847 ///
848 /// This is an unbounded sender, so this function differs from `Sink::send`
849 /// by ensuring the return type reflects that the channel is always ready to
850 /// receive messages.
851 pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
852 self.do_send_nb(msg)
853 }
854
855 /// Returns whether the senders send to the same receiver.
856 pub fn same_receiver(&self, other: &Self) -> bool {
857 match (&self.0, &other.0) {
858 (Some(inner), Some(other)) => inner.same_receiver(other),
859 _ => false,
860 }
861 }
862
863 /// Returns whether the sender send to this receiver.
864 pub fn is_connected_to(&self, receiver: &UnboundedReceiver<T>) -> bool {
865 match (&self.0, &receiver.inner) {
866 (Some(inner), Some(receiver)) => inner.is_connected_to(receiver),
867 _ => false,
868 }
869 }
870
871 /// Hashes the receiver into the provided hasher
872 pub fn hash_receiver<H>(&self, hasher: &mut H)
873 where
874 H: std::hash::Hasher,
875 {
876 use std::hash::Hash;
877
878 let ptr = self.0.as_ref().map(|inner| inner.ptr());
879 ptr.hash(hasher);
880 }
881
882 /// Return the number of messages in the queue or 0 if channel is disconnected.
883 pub fn len(&self) -> usize {
884 if let Some(sender) = &self.0 {
885 decode_state(sender.inner.state.load(SeqCst)).num_messages
886 } else {
887 0
888 }
889 }
890
891 /// Return false is channel has no queued messages, true otherwise.
892 pub fn is_empty(&self) -> bool {
893 self.len() == 0
894 }
895}
896
897impl<T> Clone for Sender<T> {
898 fn clone(&self) -> Self {
899 Self(self.0.clone())
900 }
901}
902
903impl<T> Clone for UnboundedSender<T> {
904 fn clone(&self) -> Self {
905 Self(self.0.clone())
906 }
907}
908
909impl<T> Clone for UnboundedSenderInner<T> {
910 fn clone(&self) -> Self {
911 // Since this atomic op isn't actually guarding any memory and we don't
912 // care about any orderings besides the ordering on the single atomic
913 // variable, a relaxed ordering is acceptable.
914 let mut curr = self.inner.num_senders.load(SeqCst);
915
916 loop {
917 // If the maximum number of senders has been reached, then fail
918 if curr == MAX_BUFFER {
919 panic!("cannot clone `Sender` -- too many outstanding senders");
920 }
921
922 debug_assert!(curr < MAX_BUFFER);
923
924 let next = curr + 1;
925 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
926 Ok(_) => {
927 // The ABA problem doesn't matter here. We only care that the
928 // number of senders never exceeds the maximum.
929 return Self { inner: self.inner.clone() };
930 }
931 Err(actual) => curr = actual,
932 }
933 }
934 }
935}
936
937impl<T> Clone for BoundedSenderInner<T> {
938 fn clone(&self) -> Self {
939 // Since this atomic op isn't actually guarding any memory and we don't
940 // care about any orderings besides the ordering on the single atomic
941 // variable, a relaxed ordering is acceptable.
942 let mut curr = self.inner.num_senders.load(SeqCst);
943
944 loop {
945 // If the maximum number of senders has been reached, then fail
946 if curr == self.inner.max_senders() {
947 panic!("cannot clone `Sender` -- too many outstanding senders");
948 }
949
950 debug_assert!(curr < self.inner.max_senders());
951
952 let next = curr + 1;
953 match self.inner.num_senders.compare_exchange(curr, next, SeqCst, SeqCst) {
954 Ok(_) => {
955 // The ABA problem doesn't matter here. We only care that the
956 // number of senders never exceeds the maximum.
957 return Self {
958 inner: self.inner.clone(),
959 sender_task: Arc::new(Mutex::new(SenderTask::new())),
960 maybe_parked: false,
961 };
962 }
963 Err(actual) => curr = actual,
964 }
965 }
966 }
967}
968
969impl<T> Drop for UnboundedSenderInner<T> {
970 fn drop(&mut self) {
971 // Ordering between variables don't matter here
972 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
973
974 if prev == 1 {
975 self.close_channel();
976 }
977 }
978}
979
980impl<T> Drop for BoundedSenderInner<T> {
981 fn drop(&mut self) {
982 // Ordering between variables don't matter here
983 let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
984
985 if prev == 1 {
986 self.close_channel();
987 }
988 }
989}
990
991impl<T> fmt::Debug for Sender<T> {
992 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
993 f.debug_struct("Sender").field("closed", &self.is_closed()).finish()
994 }
995}
996
997impl<T> fmt::Debug for UnboundedSender<T> {
998 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
999 f.debug_struct("UnboundedSender").field("closed", &self.is_closed()).finish()
1000 }
1001}
1002
1003/*
1004 *
1005 * ===== impl Receiver =====
1006 *
1007 */
1008
1009impl<T> Receiver<T> {
1010 /// Waits for a message from the channel.
1011 /// If the channel is empty and closed, returns [`RecvError`].
1012 pub fn recv(&mut self) -> Recv<'_, Self> {
1013 Recv::new(self)
1014 }
1015
1016 /// Closes the receiving half of a channel, without dropping it.
1017 ///
1018 /// This prevents any further messages from being sent on the channel while
1019 /// still enabling the receiver to drain messages that are buffered.
1020 pub fn close(&mut self) {
1021 if let Some(inner) = &mut self.inner {
1022 inner.set_closed();
1023
1024 // Wake up any threads waiting as they'll see that we've closed the
1025 // channel and will continue on their merry way.
1026 while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1027 task.lock().unwrap().notify();
1028 }
1029 }
1030 }
1031
1032 /// Tries to receive the next message without notifying a context if empty.
1033 ///
1034 /// It is not recommended to call this function from inside of a future,
1035 /// only when you've otherwise arranged to be notified when the channel is
1036 /// no longer empty.
1037 ///
1038 /// This function returns:
1039 /// * `Ok(Some(t))` when message is fetched
1040 /// * `Ok(None)` when channel is closed and no messages left in the queue
1041 /// * `Err(e)` when there are no messages available, but channel is not yet closed
1042 #[deprecated(note = "please use `try_recv` instead")]
1043 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1044 match self.next_message() {
1045 Poll::Ready(msg) => Ok(msg),
1046 Poll::Pending => Err(TryRecvError::Empty),
1047 }
1048 }
1049
1050 /// Tries to receive a message from the channel without blocking.
1051 /// If the channel is empty, or empty and closed, this method returns an error.
1052 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1053 match self.next_message() {
1054 Poll::Ready(Some(msg)) => Ok(msg),
1055 Poll::Ready(None) => Err(TryRecvError::Closed),
1056 Poll::Pending => Err(TryRecvError::Empty),
1057 }
1058 }
1059
1060 fn next_message(&mut self) -> Poll<Option<T>> {
1061 let inner = match self.inner.as_mut() {
1062 None => return Poll::Ready(None),
1063 Some(inner) => inner,
1064 };
1065 // Pop off a message
1066 match unsafe { inner.message_queue.pop_spin() } {
1067 Some(msg) => {
1068 // If there are any parked task handles in the parked queue,
1069 // pop one and unpark it.
1070 self.unpark_one();
1071
1072 // Decrement number of messages
1073 self.dec_num_messages();
1074
1075 Poll::Ready(Some(msg))
1076 }
1077 None => {
1078 let state = decode_state(inner.state.load(SeqCst));
1079 if state.is_closed() {
1080 // If closed flag is set AND there are no pending messages
1081 // it means end of stream
1082 self.inner = None;
1083 Poll::Ready(None)
1084 } else {
1085 // If queue is open, we need to return Pending
1086 // to be woken up when new messages arrive.
1087 // If queue is closed but num_messages is non-zero,
1088 // it means that senders updated the state,
1089 // but didn't put message to queue yet,
1090 // so we need to park until sender unparks the task
1091 // after queueing the message.
1092 Poll::Pending
1093 }
1094 }
1095 }
1096 }
1097
1098 // Unpark a single task handle if there is one pending in the parked queue
1099 fn unpark_one(&mut self) {
1100 if let Some(inner) = &mut self.inner {
1101 if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
1102 task.lock().unwrap().notify();
1103 }
1104 }
1105 }
1106
1107 fn dec_num_messages(&self) {
1108 if let Some(inner) = &self.inner {
1109 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1110 // unless there's underflow, and we know there's no underflow
1111 // because number of messages at this point is always > 0.
1112 inner.state.fetch_sub(1, SeqCst);
1113 }
1114 }
1115}
1116
1117// The receiver does not ever take a Pin to the inner T
1118impl<T> Unpin for Receiver<T> {}
1119
1120impl<T> FusedStream for Receiver<T> {
1121 fn is_terminated(&self) -> bool {
1122 self.inner.is_none()
1123 }
1124}
1125
1126impl<T> Stream for Receiver<T> {
1127 type Item = T;
1128
1129 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1130 // Try to read a message off of the message queue.
1131 match self.next_message() {
1132 Poll::Ready(msg) => {
1133 if msg.is_none() {
1134 self.inner = None;
1135 }
1136 Poll::Ready(msg)
1137 }
1138 Poll::Pending => {
1139 // There are no messages to read, in this case, park.
1140 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1141 // Check queue again after parking to prevent race condition:
1142 // a message could be added to the queue after previous `next_message`
1143 // before `register` call.
1144 self.next_message()
1145 }
1146 }
1147 }
1148
1149 fn size_hint(&self) -> (usize, Option<usize>) {
1150 if let Some(inner) = &self.inner {
1151 decode_state(inner.state.load(SeqCst)).size_hint()
1152 } else {
1153 (0, Some(0))
1154 }
1155 }
1156}
1157
1158impl<St: ?Sized + Unpin> Unpin for Recv<'_, St> {}
1159impl<'a, St: ?Sized + Stream + Unpin> Recv<'a, St> {
1160 fn new(stream: &'a mut St) -> Self {
1161 Self { stream }
1162 }
1163}
1164
1165impl<St: ?Sized + FusedStream + Unpin> FusedFuture for Recv<'_, St> {
1166 fn is_terminated(&self) -> bool {
1167 self.stream.is_terminated()
1168 }
1169}
1170
1171impl<St: ?Sized + Stream + Unpin> Future for Recv<'_, St> {
1172 type Output = Result<St::Item, RecvError>;
1173
1174 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1175 match Pin::new(&mut self.stream).poll_next(cx) {
1176 Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
1177 Poll::Ready(None) => Poll::Ready(Err(RecvError)),
1178 Poll::Pending => Poll::Pending,
1179 }
1180 }
1181}
1182
1183impl<T> Drop for Receiver<T> {
1184 fn drop(&mut self) {
1185 // Drain the channel of all pending messages
1186 self.close();
1187 if self.inner.is_some() {
1188 loop {
1189 match self.next_message() {
1190 Poll::Ready(Some(_)) => {}
1191 Poll::Ready(None) => break,
1192 Poll::Pending => {
1193 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1194
1195 // If the channel is closed, then there is no need to park.
1196 if state.is_closed() {
1197 break;
1198 }
1199
1200 // TODO: Spinning isn't ideal, it might be worth
1201 // investigating using a condvar or some other strategy
1202 // here. That said, if this case is hit, then another thread
1203 // is about to push the value into the queue and this isn't
1204 // the only spinlock in the impl right now.
1205 thread::yield_now();
1206 }
1207 }
1208 }
1209 }
1210 }
1211}
1212
1213impl<T> fmt::Debug for Receiver<T> {
1214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1215 let closed = if let Some(ref inner) = self.inner {
1216 decode_state(inner.state.load(SeqCst)).is_closed()
1217 } else {
1218 false
1219 };
1220
1221 f.debug_struct("Receiver").field("closed", &closed).finish()
1222 }
1223}
1224
1225impl<T> UnboundedReceiver<T> {
1226 /// Waits for a message from the channel.
1227 /// If the channel is empty and closed, returns [`RecvError`].
1228 pub fn recv(&mut self) -> Recv<'_, Self> {
1229 Recv::new(self)
1230 }
1231
1232 /// Closes the receiving half of a channel, without dropping it.
1233 ///
1234 /// This prevents any further messages from being sent on the channel while
1235 /// still enabling the receiver to drain messages that are buffered.
1236 pub fn close(&mut self) {
1237 if let Some(inner) = &mut self.inner {
1238 inner.set_closed();
1239 }
1240 }
1241
1242 /// Tries to receive the next message without notifying a context if empty.
1243 ///
1244 /// It is not recommended to call this function from inside of a future,
1245 /// only when you've otherwise arranged to be notified when the channel is
1246 /// no longer empty.
1247 ///
1248 /// This function returns:
1249 /// * `Ok(Some(t))` when message is fetched
1250 /// * `Ok(None)` when channel is closed and no messages left in the queue
1251 /// * `Err(e)` when there are no messages available, but channel is not yet closed
1252 #[deprecated(note = "please use `try_recv` instead")]
1253 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
1254 match self.next_message() {
1255 Poll::Ready(msg) => Ok(msg),
1256 Poll::Pending => Err(TryRecvError::Empty),
1257 }
1258 }
1259
1260 /// Tries to receive a message from the channel without blocking.
1261 /// If the channel is empty, or empty and closed, this method returns an error.
1262 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1263 match self.next_message() {
1264 Poll::Ready(Some(msg)) => Ok(msg),
1265 Poll::Ready(None) => Err(TryRecvError::Closed),
1266 Poll::Pending => Err(TryRecvError::Empty),
1267 }
1268 }
1269
1270 fn next_message(&mut self) -> Poll<Option<T>> {
1271 let inner = match self.inner.as_mut() {
1272 None => return Poll::Ready(None),
1273 Some(inner) => inner,
1274 };
1275 // Pop off a message
1276 match unsafe { inner.message_queue.pop_spin() } {
1277 Some(msg) => {
1278 // Decrement number of messages
1279 self.dec_num_messages();
1280
1281 Poll::Ready(Some(msg))
1282 }
1283 None => {
1284 let state = decode_state(inner.state.load(SeqCst));
1285 if state.is_closed() {
1286 // If closed flag is set AND there are no pending messages
1287 // it means end of stream
1288 self.inner = None;
1289 Poll::Ready(None)
1290 } else {
1291 // If queue is open, we need to return Pending
1292 // to be woken up when new messages arrive.
1293 // If queue is closed but num_messages is non-zero,
1294 // it means that senders updated the state,
1295 // but didn't put message to queue yet,
1296 // so we need to park until sender unparks the task
1297 // after queueing the message.
1298 Poll::Pending
1299 }
1300 }
1301 }
1302 }
1303
1304 fn dec_num_messages(&self) {
1305 if let Some(inner) = &self.inner {
1306 // OPEN_MASK is highest bit, so it's unaffected by subtraction
1307 // unless there's underflow, and we know there's no underflow
1308 // because number of messages at this point is always > 0.
1309 inner.state.fetch_sub(1, SeqCst);
1310 }
1311 }
1312}
1313
1314impl<T> FusedStream for UnboundedReceiver<T> {
1315 fn is_terminated(&self) -> bool {
1316 self.inner.is_none()
1317 }
1318}
1319
1320impl<T> Stream for UnboundedReceiver<T> {
1321 type Item = T;
1322
1323 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
1324 // Try to read a message off of the message queue.
1325 match self.next_message() {
1326 Poll::Ready(msg) => {
1327 if msg.is_none() {
1328 self.inner = None;
1329 }
1330 Poll::Ready(msg)
1331 }
1332 Poll::Pending => {
1333 // There are no messages to read, in this case, park.
1334 self.inner.as_ref().unwrap().recv_task.register(cx.waker());
1335 // Check queue again after parking to prevent race condition:
1336 // a message could be added to the queue after previous `next_message`
1337 // before `register` call.
1338 self.next_message()
1339 }
1340 }
1341 }
1342
1343 fn size_hint(&self) -> (usize, Option<usize>) {
1344 if let Some(inner) = &self.inner {
1345 decode_state(inner.state.load(SeqCst)).size_hint()
1346 } else {
1347 (0, Some(0))
1348 }
1349 }
1350}
1351
1352impl<T> Drop for UnboundedReceiver<T> {
1353 fn drop(&mut self) {
1354 // Drain the channel of all pending messages
1355 self.close();
1356 if self.inner.is_some() {
1357 loop {
1358 match self.next_message() {
1359 Poll::Ready(Some(_)) => {}
1360 Poll::Ready(None) => break,
1361 Poll::Pending => {
1362 let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
1363
1364 // If the channel is closed, then there is no need to park.
1365 if state.is_closed() {
1366 break;
1367 }
1368
1369 // TODO: Spinning isn't ideal, it might be worth
1370 // investigating using a condvar or some other strategy
1371 // here. That said, if this case is hit, then another thread
1372 // is about to push the value into the queue and this isn't
1373 // the only spinlock in the impl right now.
1374 thread::yield_now();
1375 }
1376 }
1377 }
1378 }
1379 }
1380}
1381
1382impl<T> fmt::Debug for UnboundedReceiver<T> {
1383 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1384 let closed = if let Some(ref inner) = self.inner {
1385 decode_state(inner.state.load(SeqCst)).is_closed()
1386 } else {
1387 false
1388 };
1389
1390 f.debug_struct("Receiver").field("closed", &closed).finish()
1391 }
1392}
1393
1394/*
1395 *
1396 * ===== impl Inner =====
1397 *
1398 */
1399
1400impl<T> UnboundedInner<T> {
1401 // Clear `open` flag in the state, keep `num_messages` intact.
1402 fn set_closed(&self) {
1403 let curr = self.state.load(SeqCst);
1404 if !decode_state(curr).is_open {
1405 return;
1406 }
1407
1408 self.state.fetch_and(!OPEN_MASK, SeqCst);
1409 }
1410}
1411
1412impl<T> BoundedInner<T> {
1413 // The return value is such that the total number of messages that can be
1414 // enqueued into the channel will never exceed MAX_CAPACITY
1415 fn max_senders(&self) -> usize {
1416 MAX_CAPACITY - self.buffer
1417 }
1418
1419 // Clear `open` flag in the state, keep `num_messages` intact.
1420 fn set_closed(&self) {
1421 let curr = self.state.load(SeqCst);
1422 if !decode_state(curr).is_open {
1423 return;
1424 }
1425
1426 self.state.fetch_and(!OPEN_MASK, SeqCst);
1427 }
1428}
1429
1430unsafe impl<T: Send> Send for UnboundedInner<T> {}
1431unsafe impl<T: Send> Sync for UnboundedInner<T> {}
1432
1433unsafe impl<T: Send> Send for BoundedInner<T> {}
1434unsafe impl<T: Send> Sync for BoundedInner<T> {}
1435
1436impl State {
1437 fn is_closed(&self) -> bool {
1438 !self.is_open && self.num_messages == 0
1439 }
1440
1441 fn size_hint(&self) -> (usize, Option<usize>) {
1442 if self.is_open {
1443 (self.num_messages, None)
1444 } else {
1445 (self.num_messages, Some(self.num_messages))
1446 }
1447 }
1448}
1449
1450/*
1451 *
1452 * ===== Helpers =====
1453 *
1454 */
1455
1456fn decode_state(num: usize) -> State {
1457 State { is_open: num & OPEN_MASK == OPEN_MASK, num_messages: num & MAX_CAPACITY }
1458}
1459
1460fn encode_state(state: &State) -> usize {
1461 let mut num = state.num_messages;
1462
1463 if state.is_open {
1464 num |= OPEN_MASK;
1465 }
1466
1467 num
1468}