async_unsync/
bounded.rs

1//! A **bounded** MPSC channel implementation.
2
3use core::{
4    fmt, mem,
5    task::{Context, Poll},
6};
7
8use crate::{
9    alloc::{collections::VecDeque, rc::Rc},
10    error::{SendError, TryRecvError, TrySendError},
11    mask::{COUNTED, UNCOUNTED},
12    queue::BoundedQueue,
13};
14
15/// Creates a new bounded channel with the given `capacity`.
16///
17/// # Panics
18///
19/// Panics, if `capacity` is zero.
20pub const fn channel<T>(capacity: usize) -> Channel<T> {
21    assert!(capacity > 0, "channel capacity must be at least 1");
22    Channel { queue: BoundedQueue::new(capacity) }
23}
24
25/// Returns a new bounded channel with pre-queued items.
26///
27/// The channel's (total) capacity will be the maximum of `minimum_capacity` and
28/// the number of items returned by `iter`.
29/// Its initial available capacity will be the difference between its total
30/// capacity and the number of pre-queued items.
31pub fn channel_from_iter<T>(min_capacity: usize, iter: impl IntoIterator<Item = T>) -> Channel<T> {
32    Channel::from_iter(min_capacity, iter)
33}
34
35/// An unsynchronized (`!Sync`), asynchronous and bounded channel.
36///
37/// Unlike [unbounded](crate::unbounded) channels, these are created with a
38/// constant [maximum capacity](Channel::max_capacity), up to which values can
39/// be send to the channel.
40/// Any further sends will have to wait (block), until capacity is restored by
41/// [receiving](Channel::recv) already stored values.
42pub struct Channel<T> {
43    queue: BoundedQueue<T>,
44}
45
46impl<T> Channel<T> {
47    /// Returns a new bounded channel with pre-allocated initial capacity.
48    ///
49    /// # Panics
50    ///
51    /// Panics, if `capacity` is zero.
52    pub fn with_initial_capacity(capacity: usize, initial: usize) -> Self {
53        Self { queue: BoundedQueue::with_capacity(capacity, initial) }
54    }
55
56    /// Returns a new bounded channel with a given capacity and pre-queued
57    /// elements.
58    ///
59    /// The initial capacity will be the difference between `capacity` and the
60    /// number of elements returned by the [`Iterator`].
61    /// The total channel capacity will be the maximum of `capacity` and the
62    /// iterator's length.
63    ///
64    /// # Panics
65    ///
66    /// Panics, if `capacity` is zero.
67    pub fn from_iter(capacity: usize, iter: impl IntoIterator<Item = T>) -> Self {
68        Self { queue: BoundedQueue::from_iter(capacity, iter) }
69    }
70
71    /// Splits the channel into borrowing [`SenderRef`] and [`ReceiverRef`]
72    /// handles.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use async_unsync::bounded;
78    ///
79    /// # async fn example_bounded_split() {
80    /// // must use a non-temporary binding for the channel
81    /// let mut chan = bounded::channel(1);
82    /// let (tx, mut rx) = chan.split();
83    /// tx.send(1).await.unwrap();
84    ///
85    /// // dropping the handles will close the channel
86    /// drop((tx, rx));
87    /// assert!(chan.is_closed());
88    ///
89    /// // ...but the queued value can still be received
90    /// assert_eq!(chan.try_recv(), Ok(1));
91    /// assert!(chan.try_recv().is_err());
92    /// # }
93    /// ```
94    pub fn split(&mut self) -> (SenderRef<'_, T>, ReceiverRef<'_, T>) {
95        self.queue.0.get_mut().set_counted();
96        (SenderRef { queue: &self.queue }, ReceiverRef { queue: &self.queue })
97    }
98
99    /// Consumes and splits the channel into owning [`Sender`] and [`Receiver`]
100    /// handles.
101    ///
102    /// This requires one additional allocation over
103    /// [`split`](Channel::split), but avoids potential lifetime
104    /// restrictions, since the returned handles are valid for the `'static`
105    /// lifetime, meaning they can be used in spawned (local) tasks.
106    pub fn into_split(mut self) -> (Sender<T>, Receiver<T>) {
107        self.queue.0.get_mut().set_counted();
108        let queue = Rc::new(self.queue);
109        (Sender { queue: Rc::clone(&queue) }, Receiver { queue })
110    }
111
112    /// Converts into the underlying [`VecDeque`] container.
113    pub fn into_deque(self) -> VecDeque<T> {
114        self.queue.into_deque()
115    }
116
117    /// Returns the number of queued elements.
118    ///
119    /// This number *may* diverge from the channel's reported
120    /// [capacity](Channel::capacity).
121    /// This will occur, when capacity is decreased by [reserving](Channel::reserve)
122    /// it without using it right away.
123    pub fn len(&self) -> usize {
124        self.queue.len()
125    }
126
127    /// Returns the maximum buffer capacity of the channel.
128    ///
129    /// This is the capacity initially specified when [creating](channel) the
130    /// channel and remains constant.
131    pub fn max_capacity(&self) -> usize {
132        self.queue.max_capacity()
133    }
134
135    /// Returns the current capacity of the channel.
136    ///
137    /// The capacity goes down when sending a value and goes up when receiving a
138    /// value.
139    /// When the capacity is zero, any subsequent sends will only resolve once
140    /// sufficient capacity is available
141    pub fn capacity(&self) -> usize {
142        self.queue.capacity()
143    }
144
145    /// Closes the channel, ensuring that all subsequent sends will fail.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// use async_unsync::{bounded, TrySendError};
151    ///
152    /// let chan = bounded::channel(1);
153    /// chan.close();
154    /// assert_eq!(chan.try_send(1), Err(TrySendError::Closed(1)));
155    /// ```
156    pub fn close(&self) {
157        self.queue.close::<UNCOUNTED>();
158    }
159
160    /// Returns `true` if the channel is closed.
161    pub fn is_closed(&self) -> bool {
162        self.queue.is_closed::<UNCOUNTED>()
163    }
164
165    /// Returns `true` if the channel is empty.
166    pub fn is_empty(&self) -> bool {
167        self.queue.len() == 0
168    }
169
170    /// Receives an element through the channel.
171    ///
172    /// # Errors
173    ///
174    /// Fails, if the channel is [empty](TryRecvError::Empty) or
175    /// [disconnected](TryRecvError::Disconnected).
176    pub fn try_recv(&self) -> Result<T, TryRecvError> {
177        self.queue.try_recv::<UNCOUNTED>()
178    }
179
180    /// Polls the channel, resolving if an element was received or the channel
181    /// is closed but ignoring whether there are any remaining **Sender**(s) or
182    /// not.
183    ///
184    /// # Panics
185    ///
186    /// This may panic, if there are is more than one concurrent poll for
187    /// receiving (i.e. either directly through `poll_recv` or by the future
188    /// returned by `recv`) an element.
189    /// In order to avoid this, there should be only one logical receiver per
190    /// each channel.
191    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
192        self.queue.poll_recv::<UNCOUNTED>(cx)
193    }
194
195    /// Receives an element through the channel.
196    ///
197    /// # Errors
198    ///
199    /// Fails, if the channel is closed (i.e., all senders have been dropped).
200    pub async fn recv(&self) -> Option<T> {
201        self.queue.recv::<UNCOUNTED>().await
202    }
203
204    /// Sends a value through the channel if there is sufficient capacity.
205    ///
206    /// # Errors
207    ///
208    /// Fails, if the queue is closed or there is no available capacity.
209    pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
210        self.queue.try_send::<UNCOUNTED>(elem)
211    }
212
213    /// Sends a value, potentially waiting until there is capacity.
214    ///
215    /// # Errors
216    ///
217    /// Fails, if the queue is closed.
218    pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
219        self.queue.send::<UNCOUNTED>(elem).await
220    }
221
222    /// Attempts to reserve a slot in the channel without blocking, if none are
223    /// available.
224    ///
225    /// The returned [`Permit`] can be used to immediately send a value to the
226    /// channel at a later point.
227    /// Dropping the permit without sending a value will return the capacity to
228    /// the channel.
229    ///
230    /// # Errors
231    ///
232    /// Fails, if there are no available permits or the channel has been closed.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use async_unsync::bounded;
238    ///
239    /// # async fn example_try_reserve() {
240    /// let chan = bounded::channel(1);
241    ///
242    /// // reserve capacity, reducing available slots to 0
243    /// let permit = chan.try_reserve().unwrap();
244    /// assert!(chan.try_send(1).is_err());
245    /// assert!(chan.try_reserve().is_err());
246    ///
247    /// permit.send(1);
248    /// assert_eq!(chan.recv().await, Some(1));
249    /// # }
250    /// ```
251    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
252        self.queue.try_reserve::<COUNTED>()?;
253        Ok(Permit { queue: &self.queue })
254    }
255
256    /// Attempts to reserve a slot in the channel without blocking.
257    ///
258    /// If no capacity is available in the channel, this will block until a slot
259    /// becomes available.
260    /// The returned [`Permit`] can be used to immediately send a value to the
261    /// channel at a later point.
262    /// Dropping the permit without sending a value will return the capacity to
263    /// the channel.
264    ///
265    /// # Errors
266    ///
267    /// Fails, if there are no available permits or the channel has been closed.
268    ///
269    /// # Examples
270    ///
271    /// ```
272    /// use async_unsync::bounded;
273    ///
274    /// # async fn example_try_reserve() {
275    /// let chan = bounded::channel(1);
276    ///
277    /// // reserve capacity, reducing available slots to 0
278    /// let permit = chan.reserve().await.unwrap();
279    /// assert!(chan.try_send(1).is_err());
280    /// assert!(chan.try_reserve().is_err());
281    ///
282    /// permit.send(1);
283    /// assert_eq!(chan.recv().await, Some(1));
284    /// # }
285    /// ```
286    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
287        self.queue.reserve::<UNCOUNTED>().await?;
288        Ok(Permit { queue: &self.queue })
289    }
290}
291
292impl<T> fmt::Debug for Channel<T> {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("Channel")
295            .field("capacity", &self.capacity())
296            .field("max_capacity", &self.max_capacity())
297            .field("is_closed", &self.is_closed())
298            .finish()
299    }
300}
301
302/// An owned handle for sending elements through a bounded split [`Channel`].
303pub struct Sender<T> {
304    queue: Rc<BoundedQueue<T>>,
305}
306
307impl<T> Sender<T> {
308    /// Returns the number of currently queued elements.
309    pub fn len(&self) -> usize {
310        self.queue.len()
311    }
312
313    /// Returns the maximum buffer capacity of the channel.
314    ///
315    /// This is the capacity initially specified when [creating](channel) the
316    /// channel and remains constant.
317    pub fn max_capacity(&self) -> usize {
318        self.queue.max_capacity()
319    }
320
321    /// Returns the current capacity of the channel.
322    ///
323    /// The capacity goes down when sending a value and goes up when receiving a
324    /// value.
325    /// When the capacity is zero, any subsequent sends will only resolve once
326    /// sufficient capacity is available
327    pub fn capacity(&self) -> usize {
328        self.queue.capacity()
329    }
330
331    /// Returns `true` if the channel is closed.
332    pub fn is_closed(&self) -> bool {
333        self.queue.is_closed::<COUNTED>()
334    }
335
336    /// Returns `true` if the channel is empty.
337    pub fn is_empty(&self) -> bool {
338        self.queue.len() == 0
339    }
340
341    /// Returns `true` if `self` and `other` are handles for the same channel
342    /// instance.
343    pub fn same_channel(&self, other: &Self) -> bool {
344        core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
345    }
346
347    /// Sends a value through the channel if there is sufficient capacity.
348    ///
349    /// # Errors
350    ///
351    /// Fails, if the queue is closed or there is no available capacity.
352    pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
353        self.queue.try_send::<COUNTED>(elem)
354    }
355
356    /// Sends a value through the channel, potentially waiting until there is
357    /// sufficient capacity.
358    ///
359    /// # Errors
360    ///
361    /// Fails, if the queue is closed.
362    pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
363        self.queue.send::<COUNTED>(elem).await
364    }
365
366    /// Attempts to reserve a slot in the channel without blocking, if none are
367    /// available.
368    ///
369    /// The returned [`Permit`] can be used to immediately send a value to the
370    /// channel at a later point.
371    /// Dropping the permit without sending a value will return the capacity to
372    /// the channel.
373    ///
374    /// # Errors
375    ///
376    /// Fails, if there are no available permits or the channel has been closed.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use async_unsync::bounded;
382    ///
383    /// # async fn example_try_reserve() {
384    /// let (tx, mut rx) = bounded::channel(1).into_split();
385    ///
386    /// // reserve capacity, reducing available slots to 0
387    /// let permit = tx.try_reserve().unwrap();
388    /// assert!(tx.try_send(1).is_err());
389    /// assert!(tx.try_reserve().is_err());
390    ///
391    /// permit.send(1);
392    /// assert_eq!(rx.recv().await, Some(1));
393    /// # }
394    /// ```
395    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
396        self.queue.try_reserve::<COUNTED>()?;
397        Ok(Permit { queue: &self.queue })
398    }
399
400    /// Attempts to reserve a slot in the channel without blocking, if none are
401    /// available.
402    ///
403    /// This moves the sender *by value* and returns an *owned* permit that can
404    /// be used to immediately send a value to the channel at a later point.
405    /// Dropping the permit without sending a value will return the capacity to
406    /// the channel.
407    ///
408    /// # Errors
409    ///
410    /// Fails, if there are no available permits or the channel has been closed.
411    ///
412    /// # Examples
413    ///
414    /// ```
415    /// use async_unsync::bounded;
416    ///
417    /// # async fn example_try_reserve() {
418    /// let (tx, mut rx) = bounded::channel(2).into_split();
419    ///
420    /// // cloning senders is cheap, so arbitrary numbers of owned permits are
421    /// // easily created
422    /// let p1 = tx.clone().try_reserve_owned().unwrap();
423    /// let p2 = tx.clone().try_reserve_owned().unwrap();
424    ///
425    /// assert!(tx.try_send(1).is_err());
426    /// assert!(tx.try_reserve().is_err());
427    /// drop(tx);
428    ///
429    /// let _ = p2.send(1);
430    /// let _ = p1.send(2);
431    ///
432    /// assert_eq!(rx.recv().await, Some(1));
433    /// assert_eq!(rx.recv().await, Some(2));
434    /// assert_eq!(rx.recv().await, None);
435    /// # }
436    /// ```
437    pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
438        if let Err(err) = self.queue.try_reserve::<COUNTED>() {
439            return Err(err.set(self));
440        }
441
442        Ok(OwnedPermit { sender: Some(self) })
443    }
444
445    /// Attempts to reserve a slot in the channel without blocking.
446    ///
447    /// If no capacity is available in the channel, this will block until a slot
448    /// becomes available.
449    /// The returned [`Permit`] can be used to immediately send a value to the
450    /// channel at a later point.
451    /// Dropping the permit without sending a value will return the capacity to
452    /// the channel.
453    ///
454    /// # Errors
455    ///
456    /// Fails, if there are no available permits or the channel has been closed.
457    ///
458    /// # Examples
459    ///
460    /// ```
461    /// use async_unsync::bounded;
462    ///
463    /// # async fn example_try_reserve() {
464    /// let (tx, mut rx) = bounded::channel(1).into_split();
465    ///
466    /// // reserve capacity, reducing available slots to 0
467    /// let permit = tx.reserve().await.unwrap();
468    /// assert!(tx.try_send(1).is_err());
469    /// assert!(tx.try_reserve().is_err());
470    ///
471    /// permit.send(1);
472    /// assert_eq!(rx.recv().await, Some(1));
473    /// # }
474    /// ```
475    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
476        self.queue.reserve::<COUNTED>().await?;
477        Ok(Permit { queue: &self.queue })
478    }
479
480    /// Attempts to reserve a slot in the channel without blocking.
481    ///
482    /// If no capacity is available in the channel, this will block until a slot
483    /// becomes available.
484    /// This moves the sender *by value* and returns an *owned* permit that can
485    /// be used to immediately send a value to the channel at a later point.
486    /// Dropping the permit without sending a value will return the capacity to
487    /// the channel.
488    ///
489    /// # Errors
490    ///
491    /// Fails, if there are no available permits or the channel has been closed.
492    ///
493    /// # Examples
494    ///
495    /// ```
496    /// use async_unsync::bounded;
497    ///
498    /// # async fn example_try_reserve() {
499    /// let (tx, mut rx) = bounded::channel(1).into_split();
500    ///
501    /// // reserve capacity, reducing available slots to 0
502    /// let permit = tx.clone().reserve_owned().await.unwrap();
503    /// assert!(tx.try_send(1).is_err());
504    /// assert!(tx.try_reserve().is_err());
505    ///
506    /// permit.send(1);
507    /// assert_eq!(rx.recv().await, Some(1));
508    /// # }
509    /// ```
510    pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<Self>> {
511        if self.queue.reserve::<COUNTED>().await.is_err() {
512            return Err(SendError(self));
513        }
514
515        Ok(OwnedPermit { sender: Some(self) })
516    }
517}
518
519impl<T> Clone for Sender<T> {
520    fn clone(&self) -> Self {
521        // SAFETY: no mutable or aliased access to queue possible
522        unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
523        Self { queue: Rc::clone(&self.queue) }
524    }
525}
526
527impl<T> Drop for Sender<T> {
528    fn drop(&mut self) {
529        // SAFETY: no mutable or aliased access to queue possible
530        unsafe { (*self.queue.0.get()).decrease_sender_count() };
531    }
532}
533
534impl<T> fmt::Debug for Sender<T> {
535    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536        f.debug_struct("Sender")
537            .field("capacity", &self.capacity())
538            .field("max_capacity", &self.max_capacity())
539            .field("is_closed", &self.is_closed())
540            .finish()
541    }
542}
543
544/// A borrowing handle for sending elements through a bounded split [`Channel`].
545pub struct SenderRef<'a, T> {
546    queue: &'a BoundedQueue<T>,
547}
548
549impl<T> SenderRef<'_, T> {
550    /// Returns the number of queued elements.
551    pub fn len(&self) -> usize {
552        self.queue.len()
553    }
554
555    /// Returns the maximum buffer capacity of the channel.
556    ///
557    /// This is the capacity initially specified when [creating](channel) the
558    /// channel and remains constant.
559    pub fn max_capacity(&self) -> usize {
560        self.queue.max_capacity()
561    }
562
563    /// Returns the current capacity of the channel.
564    ///
565    /// The capacity goes down when sending a value and goes up when receiving a
566    /// value.
567    /// When the capacity is zero, any subsequent sends will only resolve once
568    /// sufficient capacity is available
569    pub fn capacity(&self) -> usize {
570        self.queue.capacity()
571    }
572
573    /// Returns `true` if the channel is closed.
574    pub fn is_closed(&self) -> bool {
575        self.queue.is_closed::<COUNTED>()
576    }
577
578    /// Returns `true` if the channel is empty.
579    pub fn is_empty(&self) -> bool {
580        self.queue.len() == 0
581    }
582
583    /// Returns `true` if `self` and `other` are handles for the same channel
584    /// instance.
585    pub fn same_channel(&self, other: &Self) -> bool {
586        core::ptr::eq(&self.queue, &other.queue)
587    }
588
589    /// Sends a value through the channel if there is sufficient capacity.
590    ///
591    /// # Errors
592    ///
593    /// Fails, if the queue is closed or there is no available capacity.
594    pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
595        self.queue.try_send::<COUNTED>(elem)
596    }
597
598    /// Sends a value through the channel, potentially blocking until there is
599    /// sufficient capacity.
600    ///
601    /// # Errors
602    ///
603    /// Fails, if the queue is closed.
604    pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
605        self.queue.send::<COUNTED>(elem).await
606    }
607
608    /// Attempts to reserve a slot in the channel without blocking, if none are
609    /// available.
610    ///
611    /// The returned [`Permit`] can be used to immediately send a value to the
612    /// channel at a later point.
613    /// Dropping the permit without sending a value will return the capacity to
614    /// the channel.
615    ///
616    /// # Errors
617    ///
618    /// Fails, if there are no available permits or the channel has been closed.
619    ///
620    /// # Examples
621    ///
622    /// ```
623    /// use async_unsync::bounded;
624    ///
625    /// # async fn example_try_reserve() {
626    /// let mut chan = bounded::channel(1);
627    /// let (tx, mut rx) = chan.split();
628    ///
629    /// // reserve capacity, reducing available slots to 0
630    /// let permit = tx.try_reserve().unwrap();
631    /// assert!(tx.try_send(1).is_err());
632    /// assert!(tx.try_reserve().is_err());
633    ///
634    /// permit.send(1);
635    /// assert_eq!(rx.recv().await, Some(1));
636    /// # }
637    /// ```
638    pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
639        self.queue.try_reserve::<COUNTED>()?;
640        Ok(Permit { queue: self.queue })
641    }
642
643    /// Attempts to reserve a slot in the channel without blocking.
644    ///
645    /// If no capacity is available in the channel, this will block until a slot
646    /// becomes available.
647    /// The returned [`Permit`] can be used to immediately send a value to the
648    /// channel at a later point.
649    /// Dropping the permit without sending a value will return the capacity to
650    /// the channel.
651    ///
652    /// # Errors
653    ///
654    /// Fails, if there are no available permits or the channel has been closed.
655    ///
656    /// # Examples
657    ///
658    /// ```
659    /// use async_unsync::bounded;
660    ///
661    /// # async fn example_try_reserve() {
662    /// let mut chan = bounded::channel(1);
663    /// let (tx, mut rx) = chan.split();
664    ///
665    /// // reserve capacity, reducing available slots to 0
666    /// let permit = tx.reserve().await.unwrap();
667    /// assert!(tx.try_send(1).is_err());
668    /// assert!(tx.try_reserve().is_err());
669    ///
670    /// permit.send(1);
671    /// assert_eq!(rx.recv().await, Some(1));
672    /// # }
673    /// ```
674    pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
675        self.queue.reserve::<COUNTED>().await?;
676        Ok(Permit { queue: self.queue })
677    }
678}
679
680impl<T> Clone for SenderRef<'_, T> {
681    fn clone(&self) -> Self {
682        // SAFETY: no mutable or aliased access to queue possible
683        unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
684        Self { queue: self.queue }
685    }
686}
687
688impl<T> Drop for SenderRef<'_, T> {
689    fn drop(&mut self) {
690        // SAFETY: no mutable or aliased access to queue possible
691        unsafe { (*self.queue.0.get()).decrease_sender_count() };
692    }
693}
694
695impl<T> fmt::Debug for SenderRef<'_, T> {
696    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697        f.debug_struct("SenderRef")
698            .field("capacity", &self.capacity())
699            .field("max_capacity", &self.max_capacity())
700            .field("is_closed", &self.is_closed())
701            .finish()
702    }
703}
704
705/// An owning handle for receiving elements through a split bounded [`Channel`].
706pub struct Receiver<T> {
707    queue: Rc<BoundedQueue<T>>,
708}
709
710impl<T> Receiver<T> {
711    /// Closes the channel, ensuring that all subsequent sends will fail.
712    pub fn close(&mut self) {
713        self.queue.close::<COUNTED>();
714    }
715
716    /// Returns the number of queued elements.
717    pub fn len(&self) -> usize {
718        self.queue.len()
719    }
720
721    /// Returns the maximum buffer capacity of the channel.
722    ///
723    /// This is the capacity initially specified when [creating](channel) the
724    /// channel and remains constant.
725    pub fn max_capacity(&self) -> usize {
726        self.queue.max_capacity()
727    }
728
729    /// Returns the current capacity of the channel.
730    ///
731    /// The capacity goes down when sending a value and goes up when receiving a
732    /// value.
733    /// When the capacity is zero, any subsequent sends will only resolve once
734    /// sufficient capacity is available
735    pub fn capacity(&self) -> usize {
736        self.queue.capacity()
737    }
738
739    /// Returns `true` if the channel is closed.
740    pub fn is_closed(&self) -> bool {
741        self.queue.is_closed::<COUNTED>()
742    }
743
744    /// Returns `true` if the channel is empty.
745    pub fn is_empty(&self) -> bool {
746        self.queue.len() == 0
747    }
748
749    /// Attempts to receive an element through the channel.
750    ///
751    /// # Errors
752    ///
753    /// Fails, if the channel is [empty](TryRecvError::Empty) or
754    /// [disconnected](TryRecvError::Disconnected).
755    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
756        self.queue.try_recv::<COUNTED>()
757    }
758
759    /// Polls the channel, resolving if an element was received or the channel
760    /// is closed but ignoring whether there are any remaining **Sender**(s) or
761    /// not.
762    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
763        self.queue.poll_recv::<COUNTED>(cx)
764    }
765
766    /// Receives an element through the channel.
767    ///
768    /// # Errors
769    ///
770    /// Fails, if the channel is [empty](TryRecvError::Empty) or
771    /// [disconnected](TryRecvError::Disconnected).
772    pub async fn recv(&mut self) -> Option<T> {
773        self.queue.recv::<COUNTED>().await
774    }
775}
776
777impl<T> Drop for Receiver<T> {
778    fn drop(&mut self) {
779        self.queue.close::<COUNTED>();
780    }
781}
782
783impl<T> fmt::Debug for Receiver<T> {
784    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785        f.debug_struct("Receiver")
786            .field("capacity", &self.capacity())
787            .field("max_capacity", &self.max_capacity())
788            .field("is_closed", &self.is_closed())
789            .finish()
790    }
791}
792
793/// A borrowing handle for receiving elements through a split bounded [`Channel`].
794pub struct ReceiverRef<'a, T> {
795    queue: &'a BoundedQueue<T>,
796}
797
798impl<T> ReceiverRef<'_, T> {
799    /// Closes the channel, ensuring that all subsequent sends will fail.
800    pub fn close(&mut self) {
801        self.queue.close::<COUNTED>();
802    }
803
804    /// Returns the number of queued elements.
805    pub fn len(&self) -> usize {
806        self.queue.len()
807    }
808
809    /// Returns the maximum buffer capacity of the channel.
810    ///
811    /// This is the capacity initially specified when [creating](channel) the
812    /// channel and remains constant.
813    pub fn max_capacity(&self) -> usize {
814        self.queue.max_capacity()
815    }
816
817    /// Returns the current capacity of the channel.
818    ///
819    /// The capacity goes down when sending a value and goes up when receiving a
820    /// value.
821    /// When the capacity is zero, any subsequent sends will only resolve once
822    /// sufficient capacity is available
823    pub fn capacity(&self) -> usize {
824        self.queue.capacity()
825    }
826
827    /// Returns `true` if the channel is closed.
828    pub fn is_closed(&self) -> bool {
829        self.queue.is_closed::<COUNTED>()
830    }
831
832    /// Returns `true` if the channel is empty.
833    pub fn is_empty(&self) -> bool {
834        self.queue.len() == 0
835    }
836
837    /// Receives an element through the channel.
838    ///
839    /// # Errors
840    ///
841    /// Fails, if the channel is [empty](TryRecvError::Empty) or
842    /// [disconnected](TryRecvError::Disconnected).
843    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
844        self.queue.try_recv::<COUNTED>()
845    }
846
847    /// Polls the channel, resolving if an element was received or the channel
848    /// is closed, ignoring whether there are any remaining **Sender**(s) or
849    /// not.
850    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
851        self.queue.poll_recv::<COUNTED>(cx)
852    }
853
854    /// Receives an element through the channel.
855    ///
856    /// # Errors
857    ///
858    /// Fails, if the channel is closed (i.e., all senders have been dropped).
859    pub async fn recv(&mut self) -> Option<T> {
860        self.queue.recv::<COUNTED>().await
861    }
862}
863
864impl<T> Drop for ReceiverRef<'_, T> {
865    fn drop(&mut self) {
866        self.queue.close::<COUNTED>();
867    }
868}
869
870impl<T> fmt::Debug for ReceiverRef<'_, T> {
871    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
872        f.debug_struct("ReceiverRef")
873            .field("capacity", &self.capacity())
874            .field("max_capacity", &self.max_capacity())
875            .field("is_closed", &self.is_closed())
876            .finish()
877    }
878}
879
880/// A borrowing permit to send one value into the channel.
881pub struct Permit<'a, T> {
882    queue: &'a BoundedQueue<T>,
883}
884
885impl<T> Permit<'_, T> {
886    /// Sends a value using the reserved capacity.
887    ///
888    /// Since the capacity has been reserved beforehand, the value is sent
889    /// immediately and the permit is consumed.
890    /// This will succeed even if the channel has been closed.
891    pub fn send(self, elem: T) {
892        self.queue.unbounded_send(elem);
893        self.queue.unreserve(true);
894        mem::forget(self);
895    }
896}
897
898impl<T> Drop for Permit<'_, T> {
899    fn drop(&mut self) {
900        self.queue.unreserve(false);
901    }
902}
903
904impl<T> fmt::Debug for Permit<'_, T> {
905    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
906        f.debug_struct("Permit").finish_non_exhaustive()
907    }
908}
909
910/// An owned permit to send one value into the channel.
911pub struct OwnedPermit<T> {
912    sender: Option<Sender<T>>,
913}
914
915impl<T> OwnedPermit<T> {
916    /// Sends a value using the reserved capacity.
917    ///
918    /// Since the capacity has been reserved beforehand, the value is sent
919    /// immediately and the permit is consumed.
920    /// This will succeed even if the channel has been closed.
921    ///
922    /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
923    /// the [`OwnedPermit`] was reserved.
924    pub fn send(mut self, elem: T) -> Sender<T> {
925        let sender = self.sender.take().unwrap_or_else(|| unreachable!());
926        sender.queue.unbounded_send(elem);
927        sender.queue.unreserve(true);
928        sender
929    }
930}
931
932impl<T> Drop for OwnedPermit<T> {
933    fn drop(&mut self) {
934        if let Some(sender) = self.sender.take() {
935            sender.queue.unreserve(false);
936        }
937    }
938}
939
940impl<T> fmt::Debug for OwnedPermit<T> {
941    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
942        f.debug_struct("OwnedPermit").finish_non_exhaustive()
943    }
944}
945
946#[cfg(test)]
947mod tests {
948    use core::{future::Future as _, task::Poll};
949
950    use futures_lite::future;
951
952    use crate::queue::RecvFuture;
953
954    #[test]
955    #[should_panic]
956    fn channel_panic() {
957        let _ = super::channel::<i32>(0);
958    }
959
960    #[test]
961    fn recv_split() {
962        future::block_on(async {
963            let mut chan = super::channel::<i32>(4);
964            let (tx, mut rx) = chan.split();
965            assert_eq!(tx.capacity(), 4);
966
967            for i in 0..4 {
968                assert!(tx.send(i).await.is_ok());
969                assert_eq!(tx.capacity(), 4 - i as usize - 1);
970            }
971
972            assert_eq!(rx.recv().await, Some(0));
973            assert_eq!(tx.capacity(), 1);
974            assert_eq!(rx.recv().await, Some(1));
975            assert_eq!(tx.capacity(), 2);
976            assert_eq!(rx.recv().await, Some(2));
977            assert_eq!(tx.capacity(), 3);
978            assert_eq!(rx.recv().await, Some(3));
979            assert_eq!(tx.capacity(), 4);
980
981            assert!(rx.try_recv().is_err());
982            drop(rx);
983
984            assert!(tx.send(0).await.is_err());
985        });
986    }
987
988    #[test]
989    fn poll_often() {
990        future::block_on(async {
991            let mut chan = super::channel::<i32>(4);
992            let (tx, rx) = chan.split();
993
994            for i in 0..4 {
995                assert!(tx.send(i).await.is_ok());
996            }
997
998            let queue = &rx.queue.0;
999            let fut = RecvFuture::<'_, _, _, true> { queue };
1000            futures_lite::pin!(fut);
1001
1002            assert_eq!((&mut fut).await, Some(0));
1003            assert_eq!((&mut fut).await, Some(1));
1004            assert_eq!((&mut fut).await, Some(2));
1005            assert_eq!((&mut fut).await, Some(3));
1006        });
1007    }
1008
1009    #[test]
1010    fn cancel_recv() {
1011        future::block_on(async {
1012            let mut chan = super::channel::<i32>(1);
1013            let (tx, mut rx) = chan.split();
1014            assert_eq!(tx.capacity(), 1);
1015
1016            let mut r1 = Box::pin(rx.recv());
1017            core::future::poll_fn(|cx| {
1018                assert!(r1.as_mut().poll(cx).is_pending());
1019                Poll::Ready(())
1020            })
1021            .await;
1022
1023            // allow the r1 future to resolve
1024            tx.send(0).await.unwrap();
1025            assert_eq!(tx.capacity(), 0);
1026            drop(r1);
1027            assert_eq!(tx.capacity(), 0);
1028            assert_eq!(rx.recv().await, Some(0));
1029        });
1030    }
1031
1032    #[test]
1033    fn cancel_send() {
1034        future::block_on(async {
1035            let mut chan = super::channel::<i32>(1);
1036            let (tx, mut rx) = chan.split();
1037            assert_eq!(tx.capacity(), 1);
1038
1039            // fill the queue to capacity
1040            tx.send(0).await.unwrap();
1041            assert_eq!(tx.capacity(), 0);
1042
1043            let mut s1 = Box::pin(tx.send(1));
1044            let mut s2 = Box::pin(tx.send(2));
1045            core::future::poll_fn(|cx| {
1046                assert!(s1.as_mut().poll(cx).is_pending());
1047                assert!(s2.as_mut().poll(cx).is_pending());
1048                Poll::Ready(())
1049            })
1050            .await;
1051
1052            // receive the value in the channel, then drop the s1 future
1053            assert_eq!(rx.recv().await, Some(0));
1054            drop(s1);
1055            assert_eq!(tx.capacity(), 0);
1056            core::future::poll_fn(|cx| {
1057                assert!(s2.as_mut().poll(cx).is_ready());
1058                assert_eq!(tx.capacity(), 0);
1059                Poll::Ready(())
1060            })
1061            .await;
1062
1063            assert_eq!(rx.recv().await, Some(2));
1064            assert_eq!(tx.capacity(), 1);
1065            tx.send(1).await.unwrap();
1066            assert_eq!(rx.recv().await, Some(1));
1067        });
1068    }
1069
1070    #[test]
1071    fn poll_out_of_order() {
1072        future::block_on(async {
1073            let mut chan = super::channel::<i32>(1);
1074            let (tx, mut rx) = chan.split();
1075            assert_eq!(tx.capacity(), 1);
1076
1077            // fill the queue to capacity
1078            tx.send(0).await.unwrap();
1079            assert_eq!(tx.capacity(), 0);
1080
1081            let s1 = tx.send(1);
1082            let s2 = tx.send(2);
1083            futures_lite::pin!(s1, s2);
1084
1085            // poll both send futures once in order to register them, both
1086            // should return pending
1087            core::future::poll_fn(|cx| {
1088                assert!(s1.as_mut().poll(cx).is_pending());
1089                assert!(s2.as_mut().poll(cx).is_pending());
1090                assert_eq!(tx.capacity(), 0);
1091
1092                Poll::Ready(())
1093            })
1094            .await;
1095
1096            // make room in the queue
1097            assert_eq!(rx.recv().await, Some(0));
1098            assert_eq!(tx.capacity(), 0);
1099
1100            // polling the second send first should still return pending, even
1101            // though there is room in the queue, because the order has been
1102            // established when the futures were first registered
1103            core::future::poll_fn(|cx| {
1104                assert!(s2.as_mut().poll(cx).is_pending());
1105                assert_eq!(s1.as_mut().poll(cx), Poll::Ready(Ok(())));
1106                assert!(s2.as_mut().poll(cx).is_pending());
1107
1108                Poll::Ready(())
1109            })
1110            .await;
1111
1112            // make room in the queue
1113            assert_eq!(rx.recv().await, Some(1));
1114            assert!(s2.await.is_ok());
1115            assert_eq!(rx.recv().await, Some(2));
1116        });
1117    }
1118
1119    #[test]
1120    fn poll_out_of_order_drop() {
1121        future::block_on(async {
1122            let mut chan = super::channel::<i32>(1);
1123            let (tx, mut rx) = chan.split();
1124
1125            // fill the queue to capacity
1126            tx.send(0).await.unwrap();
1127
1128            let mut s1 = Box::pin(tx.send(1));
1129            let mut s2 = Box::pin(tx.send(2));
1130
1131            // poll both send futures once in order to register them, both
1132            // should return pending
1133            core::future::poll_fn(|cx| {
1134                assert!(s1.as_mut().poll(cx).is_pending());
1135                assert!(s2.as_mut().poll(cx).is_pending());
1136
1137                Poll::Ready(())
1138            })
1139            .await;
1140
1141            // make room in the queue by recving once
1142            assert_eq!(rx.recv().await, Some(0));
1143            // drop the first send before polling again, the second one should
1144            // still be able to proceed
1145            drop(s1);
1146
1147            core::future::poll_fn(|cx| {
1148                assert_eq!(s2.as_mut().poll(cx), Poll::Ready(Ok(())));
1149                Poll::Ready(())
1150            })
1151            .await;
1152
1153            assert_eq!(rx.try_recv(), Ok(2));
1154        });
1155    }
1156
1157    #[test]
1158    fn full() {
1159        let chan = super::channel::<i32>(1);
1160
1161        assert!(chan.try_send(0).is_ok());
1162        assert!(chan.try_send(1).is_err());
1163
1164        assert_eq!(chan.try_recv(), Ok(0));
1165        assert!(chan.try_send(1).is_ok());
1166        assert_eq!(chan.try_recv(), Ok(1));
1167    }
1168
1169    #[test]
1170    fn reserve_and_close() {
1171        future::block_on(async {
1172            let mut chan = super::channel::<i32>(1);
1173            let (tx, mut rx) = chan.split();
1174
1175            let permit = tx.reserve().await.unwrap();
1176            assert_eq!(tx.capacity(), 0);
1177            assert_eq!(tx.max_capacity(), 1);
1178
1179            rx.close();
1180            assert!(tx.reserve().await.is_err());
1181
1182            core::future::poll_fn(|cx| {
1183                assert!(rx.poll_recv(cx).is_pending());
1184                Poll::Ready(())
1185            })
1186            .await;
1187
1188            assert!(tx.send(1).await.is_err());
1189            permit.send(1);
1190            assert_eq!(tx.capacity(), 0);
1191
1192            assert_eq!(rx.recv().await, Some(1));
1193            assert_eq!(tx.capacity(), 1);
1194            assert_eq!(rx.recv().await, None);
1195        });
1196    }
1197
1198    #[test]
1199    fn reserve_and_cancel() {
1200        future::block_on(async {
1201            let mut chan = super::channel::<i32>(1);
1202            let (tx, mut rx) = chan.split();
1203
1204            let permit = tx.reserve().await.unwrap();
1205            assert_eq!(tx.capacity(), 0);
1206            assert_eq!(tx.max_capacity(), 1);
1207
1208            let mut fut = Box::pin(tx.reserve());
1209            core::future::poll_fn(|cx| {
1210                assert!(fut.as_mut().poll(cx).is_pending());
1211                Poll::Ready(())
1212            })
1213            .await;
1214
1215            drop(permit);
1216            let permit = fut.await.unwrap();
1217
1218            rx.close();
1219            assert!(tx.reserve().await.is_err());
1220
1221            assert!(tx.send(1).await.is_err());
1222            permit.send(1);
1223
1224            assert_eq!(rx.recv().await, Some(1));
1225            assert_eq!(rx.recv().await, None);
1226        });
1227    }
1228
1229    #[test]
1230    fn reserve_and_drop_permit() {
1231        future::block_on(async {
1232            let mut chan = super::channel::<i32>(1);
1233            let (tx, mut rx) = chan.split();
1234
1235            let permit = tx.reserve().await.unwrap();
1236            assert_eq!(tx.capacity(), 0);
1237            assert_eq!(tx.max_capacity(), 1);
1238
1239            rx.close();
1240            core::future::poll_fn(|cx| {
1241                assert!(rx.poll_recv(cx).is_pending());
1242                Poll::Ready(())
1243            })
1244            .await;
1245
1246            drop(permit);
1247            assert_eq!(tx.capacity(), 1);
1248            assert_eq!(rx.recv().await, None);
1249        });
1250    }
1251
1252    #[test]
1253    fn diverting_len_and_capacity() {
1254        future::block_on(async {
1255            let mut chan = super::channel(5);
1256            let (tx, mut rx) = chan.split();
1257
1258            tx.send(1).await.unwrap();
1259            let permit1 = tx.reserve().await.unwrap();
1260            assert_eq!(tx.len() + tx.capacity(), 4);
1261
1262            let permit2 = tx.reserve().await.unwrap();
1263            assert_eq!(tx.len() + tx.capacity(), 3);
1264
1265            permit1.send(2);
1266            permit2.send(3);
1267
1268            // now it is equal again
1269            assert_eq!(tx.len() + tx.capacity(), 5);
1270
1271            assert_eq!(rx.recv().await, Some(1));
1272            assert_eq!(rx.recv().await, Some(2));
1273            assert_eq!(rx.recv().await, Some(3));
1274        });
1275    }
1276
1277    #[test]
1278    fn split_after_close() {
1279        let mut chan = super::channel::<i32>(1);
1280        chan.close();
1281
1282        let (tx, rx) = chan.split();
1283        assert!(tx.is_closed());
1284        assert!(rx.is_closed());
1285    }
1286
1287    #[test]
1288    fn from_iter_less() {
1289        let chan = super::channel_from_iter(0, &[0, 1, 2, 3]);
1290        assert_eq!(chan.capacity(), 0);
1291    }
1292
1293    #[test]
1294    fn from_iter_more() {
1295        future::block_on(async {
1296            let chan = super::Channel::from_iter(5, [0, 1, 2, 3]);
1297            assert_eq!(chan.recv().await, Some(0));
1298            assert_eq!(chan.recv().await, Some(1));
1299            assert_eq!(chan.recv().await, Some(2));
1300            assert_eq!(chan.recv().await, Some(3));
1301            assert_eq!(chan.capacity(), 5);
1302        });
1303    }
1304
1305    #[test]
1306    fn send_vs_reserve() {
1307        future::block_on(async {
1308            let mut chan = super::channel::<i32>(1);
1309            let (tx, mut rx) = chan.split();
1310
1311            assert!(tx.send(-1).await.is_ok());
1312
1313            let mut f1 = Box::pin(tx.send(-2));
1314            let mut f2 = Box::pin(tx.send(-3));
1315            let mut f3 = Box::pin(tx.reserve());
1316
1317            core::future::poll_fn(|cx| {
1318                assert!(f1.as_mut().poll(cx).is_pending());
1319                assert!(f2.as_mut().poll(cx).is_pending());
1320                assert!(f3.as_mut().poll(cx).is_pending());
1321                Poll::Ready(())
1322            })
1323            .await;
1324
1325            assert_eq!(rx.recv().await, Some(-1));
1326            assert_eq!(tx.capacity(), 0, "capacity goes to f1");
1327
1328            assert!(f1.await.is_ok());
1329            assert_eq!(tx.capacity(), 0);
1330
1331            assert_eq!(rx.recv().await, Some(-2));
1332            assert_eq!(tx.capacity(), 0, "capacity goes to f3");
1333
1334            drop(f2);
1335            assert_eq!(tx.capacity(), 0, "capacity goes to f3");
1336
1337            f3.await.unwrap().send(-4);
1338            assert_eq!(tx.capacity(), 0);
1339
1340            assert_eq!(rx.recv().await, Some(-4));
1341            assert_eq!(tx.capacity(), 1);
1342        });
1343    }
1344}