async_std/sync/channel.rs
1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::isize;
5use std::marker::PhantomData;
6use std::mem;
7use std::pin::Pin;
8use std::process;
9use std::ptr;
10use std::sync::atomic::{self, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::task::{Context, Poll};
13
14use crossbeam_utils::Backoff;
15
16use crate::stream::Stream;
17use crate::sync::WakerSet;
18
19/// Creates a bounded multi-producer multi-consumer channel.
20///
21/// This channel has a buffer that can hold at most `cap` messages at a time.
22///
23/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it
24/// becomes closed. Receive operations on a closed and empty channel return `None` instead of
25/// trying to await a message.
26///
27/// # Panics
28///
29/// If `cap` is zero, this function will panic.
30///
31/// # Examples
32///
33/// ```
34/// # async_std::task::block_on(async {
35/// #
36/// use std::time::Duration;
37///
38/// use async_std::sync::channel;
39/// use async_std::task;
40///
41/// let (s, r) = channel(1);
42///
43/// // This call returns immediately because there is enough space in the channel.
44/// s.send(1).await;
45///
46/// task::spawn(async move {
47/// // This call will have to wait because the channel is full.
48/// // It will be able to complete only after the first message is received.
49/// s.send(2).await;
50/// });
51///
52/// task::sleep(Duration::from_secs(1)).await;
53/// assert_eq!(r.recv().await, Some(1));
54/// assert_eq!(r.recv().await, Some(2));
55/// #
56/// # })
57/// ```
58#[cfg(feature = "unstable")]
59#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
60pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
61 let channel = Arc::new(Channel::with_capacity(cap));
62 let s = Sender {
63 channel: channel.clone(),
64 };
65 let r = Receiver {
66 channel,
67 opt_key: None,
68 };
69 (s, r)
70}
71
72/// The sending side of a channel.
73///
74/// This struct is created by the [`channel`] function. See its
75/// documentation for more.
76///
77/// [`channel`]: fn.channel.html
78///
79/// # Examples
80///
81/// ```
82/// # async_std::task::block_on(async {
83/// #
84/// use async_std::sync::channel;
85/// use async_std::task;
86///
87/// let (s1, r) = channel(100);
88/// let s2 = s1.clone();
89///
90/// task::spawn(async move { s1.send(1).await });
91/// task::spawn(async move { s2.send(2).await });
92///
93/// let msg1 = r.recv().await.unwrap();
94/// let msg2 = r.recv().await.unwrap();
95///
96/// assert_eq!(msg1 + msg2, 3);
97/// #
98/// # })
99/// ```
100#[cfg(feature = "unstable")]
101#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
102pub struct Sender<T> {
103 /// The inner channel.
104 channel: Arc<Channel<T>>,
105}
106
107impl<T> Sender<T> {
108 /// Sends a message into the channel.
109 ///
110 /// If the channel is full, this method will wait until there is space in the channel.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// # async_std::task::block_on(async {
116 /// #
117 /// use async_std::sync::channel;
118 /// use async_std::task;
119 ///
120 /// let (s, r) = channel(1);
121 ///
122 /// task::spawn(async move {
123 /// s.send(1).await;
124 /// s.send(2).await;
125 /// });
126 ///
127 /// assert_eq!(r.recv().await, Some(1));
128 /// assert_eq!(r.recv().await, Some(2));
129 /// assert_eq!(r.recv().await, None);
130 /// #
131 /// # })
132 /// ```
133 pub async fn send(&self, msg: T) {
134 struct SendFuture<'a, T> {
135 channel: &'a Channel<T>,
136 msg: Option<T>,
137 opt_key: Option<usize>,
138 }
139
140 impl<T> Unpin for SendFuture<'_, T> {}
141
142 impl<T> Future for SendFuture<'_, T> {
143 type Output = ();
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 loop {
147 let msg = self.msg.take().unwrap();
148
149 // If the current task is in the set, remove it.
150 if let Some(key) = self.opt_key.take() {
151 self.channel.send_wakers.remove(key);
152 }
153
154 // Try sending the message.
155 match self.channel.try_send(msg) {
156 Ok(()) => return Poll::Ready(()),
157 Err(TrySendError::Disconnected(msg)) => {
158 self.msg = Some(msg);
159 return Poll::Pending;
160 }
161 Err(TrySendError::Full(msg)) => {
162 self.msg = Some(msg);
163
164 // Insert this send operation.
165 self.opt_key = Some(self.channel.send_wakers.insert(cx));
166
167 // If the channel is still full and not disconnected, return.
168 if self.channel.is_full() && !self.channel.is_disconnected() {
169 return Poll::Pending;
170 }
171 }
172 }
173 }
174 }
175 }
176
177 impl<T> Drop for SendFuture<'_, T> {
178 fn drop(&mut self) {
179 // If the current task is still in the set, that means it is being cancelled now.
180 // Wake up another task instead.
181 if let Some(key) = self.opt_key {
182 self.channel.send_wakers.cancel(key);
183 }
184 }
185 }
186
187 SendFuture {
188 channel: &self.channel,
189 msg: Some(msg),
190 opt_key: None,
191 }
192 .await
193 }
194
195 /// Returns the channel capacity.
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use async_std::sync::channel;
201 ///
202 /// let (s, _) = channel::<i32>(5);
203 /// assert_eq!(s.capacity(), 5);
204 /// ```
205 pub fn capacity(&self) -> usize {
206 self.channel.cap
207 }
208
209 /// Returns `true` if the channel is empty.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// # async_std::task::block_on(async {
215 /// #
216 /// use async_std::sync::channel;
217 ///
218 /// let (s, r) = channel(1);
219 ///
220 /// assert!(s.is_empty());
221 /// s.send(0).await;
222 /// assert!(!s.is_empty());
223 /// #
224 /// # })
225 /// ```
226 pub fn is_empty(&self) -> bool {
227 self.channel.is_empty()
228 }
229
230 /// Returns `true` if the channel is full.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// # async_std::task::block_on(async {
236 /// #
237 /// use async_std::sync::channel;
238 ///
239 /// let (s, r) = channel(1);
240 ///
241 /// assert!(!s.is_full());
242 /// s.send(0).await;
243 /// assert!(s.is_full());
244 /// #
245 /// # })
246 /// ```
247 pub fn is_full(&self) -> bool {
248 self.channel.is_full()
249 }
250
251 /// Returns the number of messages in the channel.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// # async_std::task::block_on(async {
257 /// #
258 /// use async_std::sync::channel;
259 ///
260 /// let (s, r) = channel(2);
261 /// assert_eq!(s.len(), 0);
262 ///
263 /// s.send(1).await;
264 /// s.send(2).await;
265 /// assert_eq!(s.len(), 2);
266 /// #
267 /// # })
268 /// ```
269 pub fn len(&self) -> usize {
270 self.channel.len()
271 }
272}
273
274impl<T> Drop for Sender<T> {
275 fn drop(&mut self) {
276 // Decrement the sender count and disconnect the channel if it drops down to zero.
277 if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
278 self.channel.disconnect();
279 }
280 }
281}
282
283impl<T> Clone for Sender<T> {
284 fn clone(&self) -> Sender<T> {
285 let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
286
287 // Make sure the count never overflows, even if lots of sender clones are leaked.
288 if count > isize::MAX as usize {
289 process::abort();
290 }
291
292 Sender {
293 channel: self.channel.clone(),
294 }
295 }
296}
297
298impl<T> fmt::Debug for Sender<T> {
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 f.pad("Sender { .. }")
301 }
302}
303
304/// The receiving side of a channel.
305///
306/// This type receives messages by calling `recv`. But it also implements the [`Stream`] trait,
307/// which means it can act as an asynchronous iterator. This struct is created by the [`channel`]
308/// function. See its documentation for more.
309///
310/// [`channel`]: fn.channel.html
311/// [`Stream`]: ../stream/trait.Stream.html
312///
313/// # Examples
314///
315/// ```
316/// # async_std::task::block_on(async {
317/// #
318/// use std::time::Duration;
319///
320/// use async_std::sync::channel;
321/// use async_std::task;
322///
323/// let (s, r) = channel(100);
324///
325/// task::spawn(async move {
326/// s.send(1).await;
327/// task::sleep(Duration::from_secs(1)).await;
328/// s.send(2).await;
329/// });
330///
331/// assert_eq!(r.recv().await, Some(1)); // Received immediately.
332/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second.
333/// #
334/// # })
335/// ```
336#[cfg(feature = "unstable")]
337#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
338pub struct Receiver<T> {
339 /// The inner channel.
340 channel: Arc<Channel<T>>,
341
342 /// The key for this receiver in the `channel.stream_wakers` set.
343 opt_key: Option<usize>,
344}
345
346impl<T> Receiver<T> {
347 /// Receives a message from the channel.
348 ///
349 /// If the channel is empty and still has senders, this method will wait until a message is
350 /// sent into the channel or until all senders get dropped.
351 ///
352 /// # Examples
353 ///
354 /// ```
355 /// # async_std::task::block_on(async {
356 /// #
357 /// use async_std::sync::channel;
358 /// use async_std::task;
359 ///
360 /// let (s, r) = channel(1);
361 ///
362 /// task::spawn(async move {
363 /// s.send(1).await;
364 /// s.send(2).await;
365 /// });
366 ///
367 /// assert_eq!(r.recv().await, Some(1));
368 /// assert_eq!(r.recv().await, Some(2));
369 /// assert_eq!(r.recv().await, None);
370 /// #
371 /// # })
372 /// ```
373 pub async fn recv(&self) -> Option<T> {
374 struct RecvFuture<'a, T> {
375 channel: &'a Channel<T>,
376 opt_key: Option<usize>,
377 }
378
379 impl<T> Future for RecvFuture<'_, T> {
380 type Output = Option<T>;
381
382 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383 poll_recv(
384 &self.channel,
385 &self.channel.recv_wakers,
386 &mut self.opt_key,
387 cx,
388 )
389 }
390 }
391
392 impl<T> Drop for RecvFuture<'_, T> {
393 fn drop(&mut self) {
394 // If the current task is still in the set, that means it is being cancelled now.
395 if let Some(key) = self.opt_key {
396 self.channel.recv_wakers.cancel(key);
397 }
398 }
399 }
400
401 RecvFuture {
402 channel: &self.channel,
403 opt_key: None,
404 }
405 .await
406 }
407
408 /// Returns the channel capacity.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// use async_std::sync::channel;
414 ///
415 /// let (_, r) = channel::<i32>(5);
416 /// assert_eq!(r.capacity(), 5);
417 /// ```
418 pub fn capacity(&self) -> usize {
419 self.channel.cap
420 }
421
422 /// Returns `true` if the channel is empty.
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// # async_std::task::block_on(async {
428 /// #
429 /// use async_std::sync::channel;
430 ///
431 /// let (s, r) = channel(1);
432 ///
433 /// assert!(r.is_empty());
434 /// s.send(0).await;
435 /// assert!(!r.is_empty());
436 /// #
437 /// # })
438 /// ```
439 pub fn is_empty(&self) -> bool {
440 self.channel.is_empty()
441 }
442
443 /// Returns `true` if the channel is full.
444 ///
445 /// # Examples
446 ///
447 /// ```
448 /// # async_std::task::block_on(async {
449 /// #
450 /// use async_std::sync::channel;
451 ///
452 /// let (s, r) = channel(1);
453 ///
454 /// assert!(!r.is_full());
455 /// s.send(0).await;
456 /// assert!(r.is_full());
457 /// #
458 /// # })
459 /// ```
460 pub fn is_full(&self) -> bool {
461 self.channel.is_full()
462 }
463
464 /// Returns the number of messages in the channel.
465 ///
466 /// # Examples
467 ///
468 /// ```
469 /// # async_std::task::block_on(async {
470 /// #
471 /// use async_std::sync::channel;
472 ///
473 /// let (s, r) = channel(2);
474 /// assert_eq!(r.len(), 0);
475 ///
476 /// s.send(1).await;
477 /// s.send(2).await;
478 /// assert_eq!(r.len(), 2);
479 /// #
480 /// # })
481 /// ```
482 pub fn len(&self) -> usize {
483 self.channel.len()
484 }
485}
486
487impl<T> Drop for Receiver<T> {
488 fn drop(&mut self) {
489 // If the current task is still in the stream set, that means it is being cancelled now.
490 if let Some(key) = self.opt_key {
491 self.channel.stream_wakers.cancel(key);
492 }
493
494 // Decrement the receiver count and disconnect the channel if it drops down to zero.
495 if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
496 self.channel.disconnect();
497 }
498 }
499}
500
501impl<T> Clone for Receiver<T> {
502 fn clone(&self) -> Receiver<T> {
503 let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
504
505 // Make sure the count never overflows, even if lots of receiver clones are leaked.
506 if count > isize::MAX as usize {
507 process::abort();
508 }
509
510 Receiver {
511 channel: self.channel.clone(),
512 opt_key: None,
513 }
514 }
515}
516
517impl<T> Stream for Receiver<T> {
518 type Item = T;
519
520 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
521 let this = &mut *self;
522 poll_recv(
523 &this.channel,
524 &this.channel.stream_wakers,
525 &mut this.opt_key,
526 cx,
527 )
528 }
529}
530
531impl<T> fmt::Debug for Receiver<T> {
532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533 f.pad("Receiver { .. }")
534 }
535}
536
537/// Polls a receive operation on a channel.
538///
539/// If the receive operation is blocked, the current task will be inserted into `wakers` and its
540/// associated key will then be stored in `opt_key`.
541fn poll_recv<T>(
542 channel: &Channel<T>,
543 wakers: &WakerSet,
544 opt_key: &mut Option<usize>,
545 cx: &mut Context<'_>,
546) -> Poll<Option<T>> {
547 loop {
548 // If the current task is in the set, remove it.
549 if let Some(key) = opt_key.take() {
550 wakers.remove(key);
551 }
552
553 // Try receiving a message.
554 match channel.try_recv() {
555 Ok(msg) => return Poll::Ready(Some(msg)),
556 Err(TryRecvError::Disconnected) => return Poll::Ready(None),
557 Err(TryRecvError::Empty) => {
558 // Insert this receive operation.
559 *opt_key = Some(wakers.insert(cx));
560
561 // If the channel is still empty and not disconnected, return.
562 if channel.is_empty() && !channel.is_disconnected() {
563 return Poll::Pending;
564 }
565 }
566 }
567 }
568}
569
570/// A slot in a channel.
571struct Slot<T> {
572 /// The current stamp.
573 stamp: AtomicUsize,
574
575 /// The message in this slot.
576 msg: UnsafeCell<T>,
577}
578
579/// Bounded channel based on a preallocated array.
580struct Channel<T> {
581 /// The head of the channel.
582 ///
583 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
584 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
585 /// represent the lap. The mark bit in the head is always zero.
586 ///
587 /// Messages are popped from the head of the channel.
588 head: AtomicUsize,
589
590 /// The tail of the channel.
591 ///
592 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
593 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
594 /// represent the lap. The mark bit indicates that the channel is disconnected.
595 ///
596 /// Messages are pushed into the tail of the channel.
597 tail: AtomicUsize,
598
599 /// The buffer holding slots.
600 buffer: *mut Slot<T>,
601
602 /// The channel capacity.
603 cap: usize,
604
605 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
606 one_lap: usize,
607
608 /// If this bit is set in the tail, that means either all senders were dropped or all receivers
609 /// were dropped.
610 mark_bit: usize,
611
612 /// Send operations waiting while the channel is full.
613 send_wakers: WakerSet,
614
615 /// Receive operations waiting while the channel is empty and not disconnected.
616 recv_wakers: WakerSet,
617
618 /// Streams waiting while the channel is empty and not disconnected.
619 stream_wakers: WakerSet,
620
621 /// The number of currently active `Sender`s.
622 sender_count: AtomicUsize,
623
624 /// The number of currently active `Receivers`s.
625 receiver_count: AtomicUsize,
626
627 /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
628 _marker: PhantomData<T>,
629}
630
631unsafe impl<T: Send> Send for Channel<T> {}
632unsafe impl<T: Send> Sync for Channel<T> {}
633impl<T> Unpin for Channel<T> {}
634
635impl<T> Channel<T> {
636 /// Creates a bounded channel of capacity `cap`.
637 fn with_capacity(cap: usize) -> Self {
638 assert!(cap > 0, "capacity must be positive");
639
640 // Compute constants `mark_bit` and `one_lap`.
641 let mark_bit = (cap + 1).next_power_of_two();
642 let one_lap = mark_bit * 2;
643
644 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
645 let head = 0;
646 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
647 let tail = 0;
648
649 // Allocate a buffer of `cap` slots.
650 let buffer = {
651 let mut v = Vec::<Slot<T>>::with_capacity(cap);
652 let ptr = v.as_mut_ptr();
653 mem::forget(v);
654 ptr
655 };
656
657 // Initialize stamps in the slots.
658 for i in 0..cap {
659 unsafe {
660 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
661 let slot = buffer.add(i);
662 ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
663 }
664 }
665
666 Channel {
667 buffer,
668 cap,
669 one_lap,
670 mark_bit,
671 head: AtomicUsize::new(head),
672 tail: AtomicUsize::new(tail),
673 send_wakers: WakerSet::new(),
674 recv_wakers: WakerSet::new(),
675 stream_wakers: WakerSet::new(),
676 sender_count: AtomicUsize::new(1),
677 receiver_count: AtomicUsize::new(1),
678 _marker: PhantomData,
679 }
680 }
681
682 /// Attempts to send a message.
683 fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
684 let backoff = Backoff::new();
685 let mut tail = self.tail.load(Ordering::Relaxed);
686
687 loop {
688 // Extract mark bit from the tail and unset it.
689 //
690 // If the mark bit was set (which means all receivers have been dropped), we will still
691 // send the message into the channel if there is enough capacity. The message will get
692 // dropped when the channel is dropped (which means when all senders are also dropped).
693 let mark_bit = tail & self.mark_bit;
694 tail ^= mark_bit;
695
696 // Deconstruct the tail.
697 let index = tail & (self.mark_bit - 1);
698 let lap = tail & !(self.one_lap - 1);
699
700 // Inspect the corresponding slot.
701 let slot = unsafe { &*self.buffer.add(index) };
702 let stamp = slot.stamp.load(Ordering::Acquire);
703
704 // If the tail and the stamp match, we may attempt to push.
705 if tail == stamp {
706 let new_tail = if index + 1 < self.cap {
707 // Same lap, incremented index.
708 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
709 tail + 1
710 } else {
711 // One lap forward, index wraps around to zero.
712 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
713 lap.wrapping_add(self.one_lap)
714 };
715
716 // Try moving the tail.
717 match self.tail.compare_exchange_weak(
718 tail | mark_bit,
719 new_tail | mark_bit,
720 Ordering::SeqCst,
721 Ordering::Relaxed,
722 ) {
723 Ok(_) => {
724 // Write the message into the slot and update the stamp.
725 unsafe { slot.msg.get().write(msg) };
726 let stamp = tail + 1;
727 slot.stamp.store(stamp, Ordering::Release);
728
729 // Wake a blocked receive operation.
730 self.recv_wakers.notify_one();
731
732 // Wake all blocked streams.
733 self.stream_wakers.notify_all();
734
735 return Ok(());
736 }
737 Err(t) => {
738 tail = t;
739 backoff.spin();
740 }
741 }
742 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
743 atomic::fence(Ordering::SeqCst);
744 let head = self.head.load(Ordering::Relaxed);
745
746 // If the head lags one lap behind the tail as well...
747 if head.wrapping_add(self.one_lap) == tail {
748 // ...then the channel is full.
749
750 // Check if the channel is disconnected.
751 if mark_bit != 0 {
752 return Err(TrySendError::Disconnected(msg));
753 } else {
754 return Err(TrySendError::Full(msg));
755 }
756 }
757
758 backoff.spin();
759 tail = self.tail.load(Ordering::Relaxed);
760 } else {
761 // Snooze because we need to wait for the stamp to get updated.
762 backoff.snooze();
763 tail = self.tail.load(Ordering::Relaxed);
764 }
765 }
766 }
767
768 /// Attempts to receive a message.
769 fn try_recv(&self) -> Result<T, TryRecvError> {
770 let backoff = Backoff::new();
771 let mut head = self.head.load(Ordering::Relaxed);
772
773 loop {
774 // Deconstruct the head.
775 let index = head & (self.mark_bit - 1);
776 let lap = head & !(self.one_lap - 1);
777
778 // Inspect the corresponding slot.
779 let slot = unsafe { &*self.buffer.add(index) };
780 let stamp = slot.stamp.load(Ordering::Acquire);
781
782 // If the the stamp is ahead of the head by 1, we may attempt to pop.
783 if head + 1 == stamp {
784 let new = if index + 1 < self.cap {
785 // Same lap, incremented index.
786 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
787 head + 1
788 } else {
789 // One lap forward, index wraps around to zero.
790 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
791 lap.wrapping_add(self.one_lap)
792 };
793
794 // Try moving the head.
795 match self.head.compare_exchange_weak(
796 head,
797 new,
798 Ordering::SeqCst,
799 Ordering::Relaxed,
800 ) {
801 Ok(_) => {
802 // Read the message from the slot and update the stamp.
803 let msg = unsafe { slot.msg.get().read() };
804 let stamp = head.wrapping_add(self.one_lap);
805 slot.stamp.store(stamp, Ordering::Release);
806
807 // Wake a blocked send operation.
808 self.send_wakers.notify_one();
809
810 return Ok(msg);
811 }
812 Err(h) => {
813 head = h;
814 backoff.spin();
815 }
816 }
817 } else if stamp == head {
818 atomic::fence(Ordering::SeqCst);
819 let tail = self.tail.load(Ordering::Relaxed);
820
821 // If the tail equals the head, that means the channel is empty.
822 if (tail & !self.mark_bit) == head {
823 // If the channel is disconnected...
824 if tail & self.mark_bit != 0 {
825 return Err(TryRecvError::Disconnected);
826 } else {
827 // Otherwise, the receive operation is not ready.
828 return Err(TryRecvError::Empty);
829 }
830 }
831
832 backoff.spin();
833 head = self.head.load(Ordering::Relaxed);
834 } else {
835 // Snooze because we need to wait for the stamp to get updated.
836 backoff.snooze();
837 head = self.head.load(Ordering::Relaxed);
838 }
839 }
840 }
841
842 /// Returns the current number of messages inside the channel.
843 fn len(&self) -> usize {
844 loop {
845 // Load the tail, then load the head.
846 let tail = self.tail.load(Ordering::SeqCst);
847 let head = self.head.load(Ordering::SeqCst);
848
849 // If the tail didn't change, we've got consistent values to work with.
850 if self.tail.load(Ordering::SeqCst) == tail {
851 let hix = head & (self.mark_bit - 1);
852 let tix = tail & (self.mark_bit - 1);
853
854 return if hix < tix {
855 tix - hix
856 } else if hix > tix {
857 self.cap - hix + tix
858 } else if (tail & !self.mark_bit) == head {
859 0
860 } else {
861 self.cap
862 };
863 }
864 }
865 }
866
867 /// Returns `true` if the channel is disconnected.
868 pub fn is_disconnected(&self) -> bool {
869 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
870 }
871
872 /// Returns `true` if the channel is empty.
873 fn is_empty(&self) -> bool {
874 let head = self.head.load(Ordering::SeqCst);
875 let tail = self.tail.load(Ordering::SeqCst);
876
877 // Is the tail equal to the head?
878 //
879 // Note: If the head changes just before we load the tail, that means there was a moment
880 // when the channel was not empty, so it is safe to just return `false`.
881 (tail & !self.mark_bit) == head
882 }
883
884 /// Returns `true` if the channel is full.
885 fn is_full(&self) -> bool {
886 let tail = self.tail.load(Ordering::SeqCst);
887 let head = self.head.load(Ordering::SeqCst);
888
889 // Is the head lagging one lap behind tail?
890 //
891 // Note: If the tail changes just before we load the head, that means there was a moment
892 // when the channel was not full, so it is safe to just return `false`.
893 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
894 }
895
896 /// Disconnects the channel and wakes up all blocked operations.
897 fn disconnect(&self) {
898 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
899
900 if tail & self.mark_bit == 0 {
901 // Notify everyone blocked on this channel.
902 self.send_wakers.notify_all();
903 self.recv_wakers.notify_all();
904 self.stream_wakers.notify_all();
905 }
906 }
907}
908
909impl<T> Drop for Channel<T> {
910 fn drop(&mut self) {
911 // Get the index of the head.
912 let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
913
914 // Loop over all slots that hold a message and drop them.
915 for i in 0..self.len() {
916 // Compute the index of the next slot holding a message.
917 let index = if hix + i < self.cap {
918 hix + i
919 } else {
920 hix + i - self.cap
921 };
922
923 unsafe {
924 self.buffer.add(index).drop_in_place();
925 }
926 }
927
928 // Finally, deallocate the buffer, but don't run any destructors.
929 unsafe {
930 Vec::from_raw_parts(self.buffer, 0, self.cap);
931 }
932 }
933}
934
935/// An error returned from the `try_send()` method.
936enum TrySendError<T> {
937 /// The channel is full but not disconnected.
938 Full(T),
939
940 /// The channel is full and disconnected.
941 Disconnected(T),
942}
943
944/// An error returned from the `try_recv()` method.
945enum TryRecvError {
946 /// The channel is empty but not disconnected.
947 Empty,
948
949 /// The channel is empty and disconnected.
950 Disconnected,
951}