async_std/sync/
channel.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::isize;
5use std::marker::PhantomData;
6use std::mem;
7use std::pin::Pin;
8use std::process;
9use std::ptr;
10use std::sync::atomic::{self, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::task::{Context, Poll};
13
14use crossbeam_utils::Backoff;
15
16use crate::stream::Stream;
17use crate::sync::WakerSet;
18
19/// Creates a bounded multi-producer multi-consumer channel.
20///
21/// This channel has a buffer that can hold at most `cap` messages at a time.
22///
23/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it
24/// becomes closed. Receive operations on a closed and empty channel return `None` instead of
25/// trying to await a message.
26///
27/// # Panics
28///
29/// If `cap` is zero, this function will panic.
30///
31/// # Examples
32///
33/// ```
34/// # async_std::task::block_on(async {
35/// #
36/// use std::time::Duration;
37///
38/// use async_std::sync::channel;
39/// use async_std::task;
40///
41/// let (s, r) = channel(1);
42///
43/// // This call returns immediately because there is enough space in the channel.
44/// s.send(1).await;
45///
46/// task::spawn(async move {
47///     // This call will have to wait because the channel is full.
48///     // It will be able to complete only after the first message is received.
49///     s.send(2).await;
50/// });
51///
52/// task::sleep(Duration::from_secs(1)).await;
53/// assert_eq!(r.recv().await, Some(1));
54/// assert_eq!(r.recv().await, Some(2));
55/// #
56/// # })
57/// ```
58#[cfg(feature = "unstable")]
59#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
60pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
61    let channel = Arc::new(Channel::with_capacity(cap));
62    let s = Sender {
63        channel: channel.clone(),
64    };
65    let r = Receiver {
66        channel,
67        opt_key: None,
68    };
69    (s, r)
70}
71
72/// The sending side of a channel.
73///
74/// This struct is created by the [`channel`] function. See its
75/// documentation for more.
76///
77/// [`channel`]: fn.channel.html
78///
79/// # Examples
80///
81/// ```
82/// # async_std::task::block_on(async {
83/// #
84/// use async_std::sync::channel;
85/// use async_std::task;
86///
87/// let (s1, r) = channel(100);
88/// let s2 = s1.clone();
89///
90/// task::spawn(async move { s1.send(1).await });
91/// task::spawn(async move { s2.send(2).await });
92///
93/// let msg1 = r.recv().await.unwrap();
94/// let msg2 = r.recv().await.unwrap();
95///
96/// assert_eq!(msg1 + msg2, 3);
97/// #
98/// # })
99/// ```
100#[cfg(feature = "unstable")]
101#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
102pub struct Sender<T> {
103    /// The inner channel.
104    channel: Arc<Channel<T>>,
105}
106
107impl<T> Sender<T> {
108    /// Sends a message into the channel.
109    ///
110    /// If the channel is full, this method will wait until there is space in the channel.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// # async_std::task::block_on(async {
116    /// #
117    /// use async_std::sync::channel;
118    /// use async_std::task;
119    ///
120    /// let (s, r) = channel(1);
121    ///
122    /// task::spawn(async move {
123    ///     s.send(1).await;
124    ///     s.send(2).await;
125    /// });
126    ///
127    /// assert_eq!(r.recv().await, Some(1));
128    /// assert_eq!(r.recv().await, Some(2));
129    /// assert_eq!(r.recv().await, None);
130    /// #
131    /// # })
132    /// ```
133    pub async fn send(&self, msg: T) {
134        struct SendFuture<'a, T> {
135            channel: &'a Channel<T>,
136            msg: Option<T>,
137            opt_key: Option<usize>,
138        }
139
140        impl<T> Unpin for SendFuture<'_, T> {}
141
142        impl<T> Future for SendFuture<'_, T> {
143            type Output = ();
144
145            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146                loop {
147                    let msg = self.msg.take().unwrap();
148
149                    // If the current task is in the set, remove it.
150                    if let Some(key) = self.opt_key.take() {
151                        self.channel.send_wakers.remove(key);
152                    }
153
154                    // Try sending the message.
155                    match self.channel.try_send(msg) {
156                        Ok(()) => return Poll::Ready(()),
157                        Err(TrySendError::Disconnected(msg)) => {
158                            self.msg = Some(msg);
159                            return Poll::Pending;
160                        }
161                        Err(TrySendError::Full(msg)) => {
162                            self.msg = Some(msg);
163
164                            // Insert this send operation.
165                            self.opt_key = Some(self.channel.send_wakers.insert(cx));
166
167                            // If the channel is still full and not disconnected, return.
168                            if self.channel.is_full() && !self.channel.is_disconnected() {
169                                return Poll::Pending;
170                            }
171                        }
172                    }
173                }
174            }
175        }
176
177        impl<T> Drop for SendFuture<'_, T> {
178            fn drop(&mut self) {
179                // If the current task is still in the set, that means it is being cancelled now.
180                // Wake up another task instead.
181                if let Some(key) = self.opt_key {
182                    self.channel.send_wakers.cancel(key);
183                }
184            }
185        }
186
187        SendFuture {
188            channel: &self.channel,
189            msg: Some(msg),
190            opt_key: None,
191        }
192        .await
193    }
194
195    /// Returns the channel capacity.
196    ///
197    /// # Examples
198    ///
199    /// ```
200    /// use async_std::sync::channel;
201    ///
202    /// let (s, _) = channel::<i32>(5);
203    /// assert_eq!(s.capacity(), 5);
204    /// ```
205    pub fn capacity(&self) -> usize {
206        self.channel.cap
207    }
208
209    /// Returns `true` if the channel is empty.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// # async_std::task::block_on(async {
215    /// #
216    /// use async_std::sync::channel;
217    ///
218    /// let (s, r) = channel(1);
219    ///
220    /// assert!(s.is_empty());
221    /// s.send(0).await;
222    /// assert!(!s.is_empty());
223    /// #
224    /// # })
225    /// ```
226    pub fn is_empty(&self) -> bool {
227        self.channel.is_empty()
228    }
229
230    /// Returns `true` if the channel is full.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// # async_std::task::block_on(async {
236    /// #
237    /// use async_std::sync::channel;
238    ///
239    /// let (s, r) = channel(1);
240    ///
241    /// assert!(!s.is_full());
242    /// s.send(0).await;
243    /// assert!(s.is_full());
244    /// #
245    /// # })
246    /// ```
247    pub fn is_full(&self) -> bool {
248        self.channel.is_full()
249    }
250
251    /// Returns the number of messages in the channel.
252    ///
253    /// # Examples
254    ///
255    /// ```
256    /// # async_std::task::block_on(async {
257    /// #
258    /// use async_std::sync::channel;
259    ///
260    /// let (s, r) = channel(2);
261    /// assert_eq!(s.len(), 0);
262    ///
263    /// s.send(1).await;
264    /// s.send(2).await;
265    /// assert_eq!(s.len(), 2);
266    /// #
267    /// # })
268    /// ```
269    pub fn len(&self) -> usize {
270        self.channel.len()
271    }
272}
273
274impl<T> Drop for Sender<T> {
275    fn drop(&mut self) {
276        // Decrement the sender count and disconnect the channel if it drops down to zero.
277        if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
278            self.channel.disconnect();
279        }
280    }
281}
282
283impl<T> Clone for Sender<T> {
284    fn clone(&self) -> Sender<T> {
285        let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
286
287        // Make sure the count never overflows, even if lots of sender clones are leaked.
288        if count > isize::MAX as usize {
289            process::abort();
290        }
291
292        Sender {
293            channel: self.channel.clone(),
294        }
295    }
296}
297
298impl<T> fmt::Debug for Sender<T> {
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300        f.pad("Sender { .. }")
301    }
302}
303
304/// The receiving side of a channel.
305///
306/// This type receives messages by calling `recv`. But it also implements the [`Stream`] trait,
307/// which means it can act as an asynchronous iterator. This struct is created by the [`channel`]
308/// function. See its documentation for more.
309///
310/// [`channel`]: fn.channel.html
311/// [`Stream`]: ../stream/trait.Stream.html
312///
313/// # Examples
314///
315/// ```
316/// # async_std::task::block_on(async {
317/// #
318/// use std::time::Duration;
319///
320/// use async_std::sync::channel;
321/// use async_std::task;
322///
323/// let (s, r) = channel(100);
324///
325/// task::spawn(async move {
326///     s.send(1).await;
327///     task::sleep(Duration::from_secs(1)).await;
328///     s.send(2).await;
329/// });
330///
331/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
332/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
333/// #
334/// # })
335/// ```
336#[cfg(feature = "unstable")]
337#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
338pub struct Receiver<T> {
339    /// The inner channel.
340    channel: Arc<Channel<T>>,
341
342    /// The key for this receiver in the `channel.stream_wakers` set.
343    opt_key: Option<usize>,
344}
345
346impl<T> Receiver<T> {
347    /// Receives a message from the channel.
348    ///
349    /// If the channel is empty and still has senders, this method will wait until a message is
350    /// sent into the channel or until all senders get dropped.
351    ///
352    /// # Examples
353    ///
354    /// ```
355    /// # async_std::task::block_on(async {
356    /// #
357    /// use async_std::sync::channel;
358    /// use async_std::task;
359    ///
360    /// let (s, r) = channel(1);
361    ///
362    /// task::spawn(async move {
363    ///     s.send(1).await;
364    ///     s.send(2).await;
365    /// });
366    ///
367    /// assert_eq!(r.recv().await, Some(1));
368    /// assert_eq!(r.recv().await, Some(2));
369    /// assert_eq!(r.recv().await, None);
370    /// #
371    /// # })
372    /// ```
373    pub async fn recv(&self) -> Option<T> {
374        struct RecvFuture<'a, T> {
375            channel: &'a Channel<T>,
376            opt_key: Option<usize>,
377        }
378
379        impl<T> Future for RecvFuture<'_, T> {
380            type Output = Option<T>;
381
382            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383                poll_recv(
384                    &self.channel,
385                    &self.channel.recv_wakers,
386                    &mut self.opt_key,
387                    cx,
388                )
389            }
390        }
391
392        impl<T> Drop for RecvFuture<'_, T> {
393            fn drop(&mut self) {
394                // If the current task is still in the set, that means it is being cancelled now.
395                if let Some(key) = self.opt_key {
396                    self.channel.recv_wakers.cancel(key);
397                }
398            }
399        }
400
401        RecvFuture {
402            channel: &self.channel,
403            opt_key: None,
404        }
405        .await
406    }
407
408    /// Returns the channel capacity.
409    ///
410    /// # Examples
411    ///
412    /// ```
413    /// use async_std::sync::channel;
414    ///
415    /// let (_, r) = channel::<i32>(5);
416    /// assert_eq!(r.capacity(), 5);
417    /// ```
418    pub fn capacity(&self) -> usize {
419        self.channel.cap
420    }
421
422    /// Returns `true` if the channel is empty.
423    ///
424    /// # Examples
425    ///
426    /// ```
427    /// # async_std::task::block_on(async {
428    /// #
429    /// use async_std::sync::channel;
430    ///
431    /// let (s, r) = channel(1);
432    ///
433    /// assert!(r.is_empty());
434    /// s.send(0).await;
435    /// assert!(!r.is_empty());
436    /// #
437    /// # })
438    /// ```
439    pub fn is_empty(&self) -> bool {
440        self.channel.is_empty()
441    }
442
443    /// Returns `true` if the channel is full.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// # async_std::task::block_on(async {
449    /// #
450    /// use async_std::sync::channel;
451    ///
452    /// let (s, r) = channel(1);
453    ///
454    /// assert!(!r.is_full());
455    /// s.send(0).await;
456    /// assert!(r.is_full());
457    /// #
458    /// # })
459    /// ```
460    pub fn is_full(&self) -> bool {
461        self.channel.is_full()
462    }
463
464    /// Returns the number of messages in the channel.
465    ///
466    /// # Examples
467    ///
468    /// ```
469    /// # async_std::task::block_on(async {
470    /// #
471    /// use async_std::sync::channel;
472    ///
473    /// let (s, r) = channel(2);
474    /// assert_eq!(r.len(), 0);
475    ///
476    /// s.send(1).await;
477    /// s.send(2).await;
478    /// assert_eq!(r.len(), 2);
479    /// #
480    /// # })
481    /// ```
482    pub fn len(&self) -> usize {
483        self.channel.len()
484    }
485}
486
487impl<T> Drop for Receiver<T> {
488    fn drop(&mut self) {
489        // If the current task is still in the stream set, that means it is being cancelled now.
490        if let Some(key) = self.opt_key {
491            self.channel.stream_wakers.cancel(key);
492        }
493
494        // Decrement the receiver count and disconnect the channel if it drops down to zero.
495        if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
496            self.channel.disconnect();
497        }
498    }
499}
500
501impl<T> Clone for Receiver<T> {
502    fn clone(&self) -> Receiver<T> {
503        let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
504
505        // Make sure the count never overflows, even if lots of receiver clones are leaked.
506        if count > isize::MAX as usize {
507            process::abort();
508        }
509
510        Receiver {
511            channel: self.channel.clone(),
512            opt_key: None,
513        }
514    }
515}
516
517impl<T> Stream for Receiver<T> {
518    type Item = T;
519
520    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
521        let this = &mut *self;
522        poll_recv(
523            &this.channel,
524            &this.channel.stream_wakers,
525            &mut this.opt_key,
526            cx,
527        )
528    }
529}
530
531impl<T> fmt::Debug for Receiver<T> {
532    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533        f.pad("Receiver { .. }")
534    }
535}
536
537/// Polls a receive operation on a channel.
538///
539/// If the receive operation is blocked, the current task will be inserted into `wakers` and its
540/// associated key will then be stored in `opt_key`.
541fn poll_recv<T>(
542    channel: &Channel<T>,
543    wakers: &WakerSet,
544    opt_key: &mut Option<usize>,
545    cx: &mut Context<'_>,
546) -> Poll<Option<T>> {
547    loop {
548        // If the current task is in the set, remove it.
549        if let Some(key) = opt_key.take() {
550            wakers.remove(key);
551        }
552
553        // Try receiving a message.
554        match channel.try_recv() {
555            Ok(msg) => return Poll::Ready(Some(msg)),
556            Err(TryRecvError::Disconnected) => return Poll::Ready(None),
557            Err(TryRecvError::Empty) => {
558                // Insert this receive operation.
559                *opt_key = Some(wakers.insert(cx));
560
561                // If the channel is still empty and not disconnected, return.
562                if channel.is_empty() && !channel.is_disconnected() {
563                    return Poll::Pending;
564                }
565            }
566        }
567    }
568}
569
570/// A slot in a channel.
571struct Slot<T> {
572    /// The current stamp.
573    stamp: AtomicUsize,
574
575    /// The message in this slot.
576    msg: UnsafeCell<T>,
577}
578
579/// Bounded channel based on a preallocated array.
580struct Channel<T> {
581    /// The head of the channel.
582    ///
583    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
584    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
585    /// represent the lap. The mark bit in the head is always zero.
586    ///
587    /// Messages are popped from the head of the channel.
588    head: AtomicUsize,
589
590    /// The tail of the channel.
591    ///
592    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
593    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
594    /// represent the lap. The mark bit indicates that the channel is disconnected.
595    ///
596    /// Messages are pushed into the tail of the channel.
597    tail: AtomicUsize,
598
599    /// The buffer holding slots.
600    buffer: *mut Slot<T>,
601
602    /// The channel capacity.
603    cap: usize,
604
605    /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
606    one_lap: usize,
607
608    /// If this bit is set in the tail, that means either all senders were dropped or all receivers
609    /// were dropped.
610    mark_bit: usize,
611
612    /// Send operations waiting while the channel is full.
613    send_wakers: WakerSet,
614
615    /// Receive operations waiting while the channel is empty and not disconnected.
616    recv_wakers: WakerSet,
617
618    /// Streams waiting while the channel is empty and not disconnected.
619    stream_wakers: WakerSet,
620
621    /// The number of currently active `Sender`s.
622    sender_count: AtomicUsize,
623
624    /// The number of currently active `Receivers`s.
625    receiver_count: AtomicUsize,
626
627    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
628    _marker: PhantomData<T>,
629}
630
631unsafe impl<T: Send> Send for Channel<T> {}
632unsafe impl<T: Send> Sync for Channel<T> {}
633impl<T> Unpin for Channel<T> {}
634
635impl<T> Channel<T> {
636    /// Creates a bounded channel of capacity `cap`.
637    fn with_capacity(cap: usize) -> Self {
638        assert!(cap > 0, "capacity must be positive");
639
640        // Compute constants `mark_bit` and `one_lap`.
641        let mark_bit = (cap + 1).next_power_of_two();
642        let one_lap = mark_bit * 2;
643
644        // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
645        let head = 0;
646        // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
647        let tail = 0;
648
649        // Allocate a buffer of `cap` slots.
650        let buffer = {
651            let mut v = Vec::<Slot<T>>::with_capacity(cap);
652            let ptr = v.as_mut_ptr();
653            mem::forget(v);
654            ptr
655        };
656
657        // Initialize stamps in the slots.
658        for i in 0..cap {
659            unsafe {
660                // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
661                let slot = buffer.add(i);
662                ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
663            }
664        }
665
666        Channel {
667            buffer,
668            cap,
669            one_lap,
670            mark_bit,
671            head: AtomicUsize::new(head),
672            tail: AtomicUsize::new(tail),
673            send_wakers: WakerSet::new(),
674            recv_wakers: WakerSet::new(),
675            stream_wakers: WakerSet::new(),
676            sender_count: AtomicUsize::new(1),
677            receiver_count: AtomicUsize::new(1),
678            _marker: PhantomData,
679        }
680    }
681
682    /// Attempts to send a message.
683    fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
684        let backoff = Backoff::new();
685        let mut tail = self.tail.load(Ordering::Relaxed);
686
687        loop {
688            // Extract mark bit from the tail and unset it.
689            //
690            // If the mark bit was set (which means all receivers have been dropped), we will still
691            // send the message into the channel if there is enough capacity. The message will get
692            // dropped when the channel is dropped (which means when all senders are also dropped).
693            let mark_bit = tail & self.mark_bit;
694            tail ^= mark_bit;
695
696            // Deconstruct the tail.
697            let index = tail & (self.mark_bit - 1);
698            let lap = tail & !(self.one_lap - 1);
699
700            // Inspect the corresponding slot.
701            let slot = unsafe { &*self.buffer.add(index) };
702            let stamp = slot.stamp.load(Ordering::Acquire);
703
704            // If the tail and the stamp match, we may attempt to push.
705            if tail == stamp {
706                let new_tail = if index + 1 < self.cap {
707                    // Same lap, incremented index.
708                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
709                    tail + 1
710                } else {
711                    // One lap forward, index wraps around to zero.
712                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
713                    lap.wrapping_add(self.one_lap)
714                };
715
716                // Try moving the tail.
717                match self.tail.compare_exchange_weak(
718                    tail | mark_bit,
719                    new_tail | mark_bit,
720                    Ordering::SeqCst,
721                    Ordering::Relaxed,
722                ) {
723                    Ok(_) => {
724                        // Write the message into the slot and update the stamp.
725                        unsafe { slot.msg.get().write(msg) };
726                        let stamp = tail + 1;
727                        slot.stamp.store(stamp, Ordering::Release);
728
729                        // Wake a blocked receive operation.
730                        self.recv_wakers.notify_one();
731
732                        // Wake all blocked streams.
733                        self.stream_wakers.notify_all();
734
735                        return Ok(());
736                    }
737                    Err(t) => {
738                        tail = t;
739                        backoff.spin();
740                    }
741                }
742            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
743                atomic::fence(Ordering::SeqCst);
744                let head = self.head.load(Ordering::Relaxed);
745
746                // If the head lags one lap behind the tail as well...
747                if head.wrapping_add(self.one_lap) == tail {
748                    // ...then the channel is full.
749
750                    // Check if the channel is disconnected.
751                    if mark_bit != 0 {
752                        return Err(TrySendError::Disconnected(msg));
753                    } else {
754                        return Err(TrySendError::Full(msg));
755                    }
756                }
757
758                backoff.spin();
759                tail = self.tail.load(Ordering::Relaxed);
760            } else {
761                // Snooze because we need to wait for the stamp to get updated.
762                backoff.snooze();
763                tail = self.tail.load(Ordering::Relaxed);
764            }
765        }
766    }
767
768    /// Attempts to receive a message.
769    fn try_recv(&self) -> Result<T, TryRecvError> {
770        let backoff = Backoff::new();
771        let mut head = self.head.load(Ordering::Relaxed);
772
773        loop {
774            // Deconstruct the head.
775            let index = head & (self.mark_bit - 1);
776            let lap = head & !(self.one_lap - 1);
777
778            // Inspect the corresponding slot.
779            let slot = unsafe { &*self.buffer.add(index) };
780            let stamp = slot.stamp.load(Ordering::Acquire);
781
782            // If the the stamp is ahead of the head by 1, we may attempt to pop.
783            if head + 1 == stamp {
784                let new = if index + 1 < self.cap {
785                    // Same lap, incremented index.
786                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
787                    head + 1
788                } else {
789                    // One lap forward, index wraps around to zero.
790                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
791                    lap.wrapping_add(self.one_lap)
792                };
793
794                // Try moving the head.
795                match self.head.compare_exchange_weak(
796                    head,
797                    new,
798                    Ordering::SeqCst,
799                    Ordering::Relaxed,
800                ) {
801                    Ok(_) => {
802                        // Read the message from the slot and update the stamp.
803                        let msg = unsafe { slot.msg.get().read() };
804                        let stamp = head.wrapping_add(self.one_lap);
805                        slot.stamp.store(stamp, Ordering::Release);
806
807                        // Wake a blocked send operation.
808                        self.send_wakers.notify_one();
809
810                        return Ok(msg);
811                    }
812                    Err(h) => {
813                        head = h;
814                        backoff.spin();
815                    }
816                }
817            } else if stamp == head {
818                atomic::fence(Ordering::SeqCst);
819                let tail = self.tail.load(Ordering::Relaxed);
820
821                // If the tail equals the head, that means the channel is empty.
822                if (tail & !self.mark_bit) == head {
823                    // If the channel is disconnected...
824                    if tail & self.mark_bit != 0 {
825                        return Err(TryRecvError::Disconnected);
826                    } else {
827                        // Otherwise, the receive operation is not ready.
828                        return Err(TryRecvError::Empty);
829                    }
830                }
831
832                backoff.spin();
833                head = self.head.load(Ordering::Relaxed);
834            } else {
835                // Snooze because we need to wait for the stamp to get updated.
836                backoff.snooze();
837                head = self.head.load(Ordering::Relaxed);
838            }
839        }
840    }
841
842    /// Returns the current number of messages inside the channel.
843    fn len(&self) -> usize {
844        loop {
845            // Load the tail, then load the head.
846            let tail = self.tail.load(Ordering::SeqCst);
847            let head = self.head.load(Ordering::SeqCst);
848
849            // If the tail didn't change, we've got consistent values to work with.
850            if self.tail.load(Ordering::SeqCst) == tail {
851                let hix = head & (self.mark_bit - 1);
852                let tix = tail & (self.mark_bit - 1);
853
854                return if hix < tix {
855                    tix - hix
856                } else if hix > tix {
857                    self.cap - hix + tix
858                } else if (tail & !self.mark_bit) == head {
859                    0
860                } else {
861                    self.cap
862                };
863            }
864        }
865    }
866
867    /// Returns `true` if the channel is disconnected.
868    pub fn is_disconnected(&self) -> bool {
869        self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
870    }
871
872    /// Returns `true` if the channel is empty.
873    fn is_empty(&self) -> bool {
874        let head = self.head.load(Ordering::SeqCst);
875        let tail = self.tail.load(Ordering::SeqCst);
876
877        // Is the tail equal to the head?
878        //
879        // Note: If the head changes just before we load the tail, that means there was a moment
880        // when the channel was not empty, so it is safe to just return `false`.
881        (tail & !self.mark_bit) == head
882    }
883
884    /// Returns `true` if the channel is full.
885    fn is_full(&self) -> bool {
886        let tail = self.tail.load(Ordering::SeqCst);
887        let head = self.head.load(Ordering::SeqCst);
888
889        // Is the head lagging one lap behind tail?
890        //
891        // Note: If the tail changes just before we load the head, that means there was a moment
892        // when the channel was not full, so it is safe to just return `false`.
893        head.wrapping_add(self.one_lap) == tail & !self.mark_bit
894    }
895
896    /// Disconnects the channel and wakes up all blocked operations.
897    fn disconnect(&self) {
898        let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
899
900        if tail & self.mark_bit == 0 {
901            // Notify everyone blocked on this channel.
902            self.send_wakers.notify_all();
903            self.recv_wakers.notify_all();
904            self.stream_wakers.notify_all();
905        }
906    }
907}
908
909impl<T> Drop for Channel<T> {
910    fn drop(&mut self) {
911        // Get the index of the head.
912        let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
913
914        // Loop over all slots that hold a message and drop them.
915        for i in 0..self.len() {
916            // Compute the index of the next slot holding a message.
917            let index = if hix + i < self.cap {
918                hix + i
919            } else {
920                hix + i - self.cap
921            };
922
923            unsafe {
924                self.buffer.add(index).drop_in_place();
925            }
926        }
927
928        // Finally, deallocate the buffer, but don't run any destructors.
929        unsafe {
930            Vec::from_raw_parts(self.buffer, 0, self.cap);
931        }
932    }
933}
934
935/// An error returned from the `try_send()` method.
936enum TrySendError<T> {
937    /// The channel is full but not disconnected.
938    Full(T),
939
940    /// The channel is full and disconnected.
941    Disconnected(T),
942}
943
944/// An error returned from the `try_recv()` method.
945enum TryRecvError {
946    /// The channel is empty but not disconnected.
947    Empty,
948
949    /// The channel is empty and disconnected.
950    Disconnected,
951}