async_unsync/bounded.rs
1//! A **bounded** MPSC channel implementation.
2
3use core::{
4 fmt, mem,
5 task::{Context, Poll},
6};
7
8use crate::{
9 alloc::{collections::VecDeque, rc::Rc},
10 error::{SendError, TryRecvError, TrySendError},
11 mask::{COUNTED, UNCOUNTED},
12 queue::BoundedQueue,
13};
14
15/// Creates a new bounded channel with the given `capacity`.
16///
17/// # Panics
18///
19/// Panics, if `capacity` is zero.
20pub const fn channel<T>(capacity: usize) -> Channel<T> {
21 assert!(capacity > 0, "channel capacity must be at least 1");
22 Channel { queue: BoundedQueue::new(capacity) }
23}
24
25/// Returns a new bounded channel with pre-queued items.
26///
27/// The channel's (total) capacity will be the maximum of `minimum_capacity` and
28/// the number of items returned by `iter`.
29/// Its initial available capacity will be the difference between its total
30/// capacity and the number of pre-queued items.
31pub fn channel_from_iter<T>(min_capacity: usize, iter: impl IntoIterator<Item = T>) -> Channel<T> {
32 Channel::from_iter(min_capacity, iter)
33}
34
35/// An unsynchronized (`!Sync`), asynchronous and bounded channel.
36///
37/// Unlike [unbounded](crate::unbounded) channels, these are created with a
38/// constant [maximum capacity](Channel::max_capacity), up to which values can
39/// be send to the channel.
40/// Any further sends will have to wait (block), until capacity is restored by
41/// [receiving](Channel::recv) already stored values.
42pub struct Channel<T> {
43 queue: BoundedQueue<T>,
44}
45
46impl<T> Channel<T> {
47 /// Returns a new bounded channel with pre-allocated initial capacity.
48 ///
49 /// # Panics
50 ///
51 /// Panics, if `capacity` is zero.
52 pub fn with_initial_capacity(capacity: usize, initial: usize) -> Self {
53 Self { queue: BoundedQueue::with_capacity(capacity, initial) }
54 }
55
56 /// Returns a new bounded channel with a given capacity and pre-queued
57 /// elements.
58 ///
59 /// The initial capacity will be the difference between `capacity` and the
60 /// number of elements returned by the [`Iterator`].
61 /// The total channel capacity will be the maximum of `capacity` and the
62 /// iterator's length.
63 ///
64 /// # Panics
65 ///
66 /// Panics, if `capacity` is zero.
67 pub fn from_iter(capacity: usize, iter: impl IntoIterator<Item = T>) -> Self {
68 Self { queue: BoundedQueue::from_iter(capacity, iter) }
69 }
70
71 /// Splits the channel into borrowing [`SenderRef`] and [`ReceiverRef`]
72 /// handles.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// use async_unsync::bounded;
78 ///
79 /// # async fn example_bounded_split() {
80 /// // must use a non-temporary binding for the channel
81 /// let mut chan = bounded::channel(1);
82 /// let (tx, mut rx) = chan.split();
83 /// tx.send(1).await.unwrap();
84 ///
85 /// // dropping the handles will close the channel
86 /// drop((tx, rx));
87 /// assert!(chan.is_closed());
88 ///
89 /// // ...but the queued value can still be received
90 /// assert_eq!(chan.try_recv(), Ok(1));
91 /// assert!(chan.try_recv().is_err());
92 /// # }
93 /// ```
94 pub fn split(&mut self) -> (SenderRef<'_, T>, ReceiverRef<'_, T>) {
95 self.queue.0.get_mut().set_counted();
96 (SenderRef { queue: &self.queue }, ReceiverRef { queue: &self.queue })
97 }
98
99 /// Consumes and splits the channel into owning [`Sender`] and [`Receiver`]
100 /// handles.
101 ///
102 /// This requires one additional allocation over
103 /// [`split`](Channel::split), but avoids potential lifetime
104 /// restrictions, since the returned handles are valid for the `'static`
105 /// lifetime, meaning they can be used in spawned (local) tasks.
106 pub fn into_split(mut self) -> (Sender<T>, Receiver<T>) {
107 self.queue.0.get_mut().set_counted();
108 let queue = Rc::new(self.queue);
109 (Sender { queue: Rc::clone(&queue) }, Receiver { queue })
110 }
111
112 /// Converts into the underlying [`VecDeque`] container.
113 pub fn into_deque(self) -> VecDeque<T> {
114 self.queue.into_deque()
115 }
116
117 /// Returns the number of queued elements.
118 ///
119 /// This number *may* diverge from the channel's reported
120 /// [capacity](Channel::capacity).
121 /// This will occur, when capacity is decreased by [reserving](Channel::reserve)
122 /// it without using it right away.
123 pub fn len(&self) -> usize {
124 self.queue.len()
125 }
126
127 /// Returns the maximum buffer capacity of the channel.
128 ///
129 /// This is the capacity initially specified when [creating](channel) the
130 /// channel and remains constant.
131 pub fn max_capacity(&self) -> usize {
132 self.queue.max_capacity()
133 }
134
135 /// Returns the current capacity of the channel.
136 ///
137 /// The capacity goes down when sending a value and goes up when receiving a
138 /// value.
139 /// When the capacity is zero, any subsequent sends will only resolve once
140 /// sufficient capacity is available
141 pub fn capacity(&self) -> usize {
142 self.queue.capacity()
143 }
144
145 /// Closes the channel, ensuring that all subsequent sends will fail.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// use async_unsync::{bounded, TrySendError};
151 ///
152 /// let chan = bounded::channel(1);
153 /// chan.close();
154 /// assert_eq!(chan.try_send(1), Err(TrySendError::Closed(1)));
155 /// ```
156 pub fn close(&self) {
157 self.queue.close::<UNCOUNTED>();
158 }
159
160 /// Returns `true` if the channel is closed.
161 pub fn is_closed(&self) -> bool {
162 self.queue.is_closed::<UNCOUNTED>()
163 }
164
165 /// Returns `true` if the channel is empty.
166 pub fn is_empty(&self) -> bool {
167 self.queue.len() == 0
168 }
169
170 /// Receives an element through the channel.
171 ///
172 /// # Errors
173 ///
174 /// Fails, if the channel is [empty](TryRecvError::Empty) or
175 /// [disconnected](TryRecvError::Disconnected).
176 pub fn try_recv(&self) -> Result<T, TryRecvError> {
177 self.queue.try_recv::<UNCOUNTED>()
178 }
179
180 /// Polls the channel, resolving if an element was received or the channel
181 /// is closed but ignoring whether there are any remaining **Sender**(s) or
182 /// not.
183 ///
184 /// # Panics
185 ///
186 /// This may panic, if there are is more than one concurrent poll for
187 /// receiving (i.e. either directly through `poll_recv` or by the future
188 /// returned by `recv`) an element.
189 /// In order to avoid this, there should be only one logical receiver per
190 /// each channel.
191 pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
192 self.queue.poll_recv::<UNCOUNTED>(cx)
193 }
194
195 /// Receives an element through the channel.
196 ///
197 /// # Errors
198 ///
199 /// Fails, if the channel is closed (i.e., all senders have been dropped).
200 pub async fn recv(&self) -> Option<T> {
201 self.queue.recv::<UNCOUNTED>().await
202 }
203
204 /// Sends a value through the channel if there is sufficient capacity.
205 ///
206 /// # Errors
207 ///
208 /// Fails, if the queue is closed or there is no available capacity.
209 pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
210 self.queue.try_send::<UNCOUNTED>(elem)
211 }
212
213 /// Sends a value, potentially waiting until there is capacity.
214 ///
215 /// # Errors
216 ///
217 /// Fails, if the queue is closed.
218 pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
219 self.queue.send::<UNCOUNTED>(elem).await
220 }
221
222 /// Attempts to reserve a slot in the channel without blocking, if none are
223 /// available.
224 ///
225 /// The returned [`Permit`] can be used to immediately send a value to the
226 /// channel at a later point.
227 /// Dropping the permit without sending a value will return the capacity to
228 /// the channel.
229 ///
230 /// # Errors
231 ///
232 /// Fails, if there are no available permits or the channel has been closed.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use async_unsync::bounded;
238 ///
239 /// # async fn example_try_reserve() {
240 /// let chan = bounded::channel(1);
241 ///
242 /// // reserve capacity, reducing available slots to 0
243 /// let permit = chan.try_reserve().unwrap();
244 /// assert!(chan.try_send(1).is_err());
245 /// assert!(chan.try_reserve().is_err());
246 ///
247 /// permit.send(1);
248 /// assert_eq!(chan.recv().await, Some(1));
249 /// # }
250 /// ```
251 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
252 self.queue.try_reserve::<COUNTED>()?;
253 Ok(Permit { queue: &self.queue })
254 }
255
256 /// Attempts to reserve a slot in the channel without blocking.
257 ///
258 /// If no capacity is available in the channel, this will block until a slot
259 /// becomes available.
260 /// The returned [`Permit`] can be used to immediately send a value to the
261 /// channel at a later point.
262 /// Dropping the permit without sending a value will return the capacity to
263 /// the channel.
264 ///
265 /// # Errors
266 ///
267 /// Fails, if there are no available permits or the channel has been closed.
268 ///
269 /// # Examples
270 ///
271 /// ```
272 /// use async_unsync::bounded;
273 ///
274 /// # async fn example_try_reserve() {
275 /// let chan = bounded::channel(1);
276 ///
277 /// // reserve capacity, reducing available slots to 0
278 /// let permit = chan.reserve().await.unwrap();
279 /// assert!(chan.try_send(1).is_err());
280 /// assert!(chan.try_reserve().is_err());
281 ///
282 /// permit.send(1);
283 /// assert_eq!(chan.recv().await, Some(1));
284 /// # }
285 /// ```
286 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
287 self.queue.reserve::<UNCOUNTED>().await?;
288 Ok(Permit { queue: &self.queue })
289 }
290}
291
292impl<T> fmt::Debug for Channel<T> {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("Channel")
295 .field("capacity", &self.capacity())
296 .field("max_capacity", &self.max_capacity())
297 .field("is_closed", &self.is_closed())
298 .finish()
299 }
300}
301
302/// An owned handle for sending elements through a bounded split [`Channel`].
303pub struct Sender<T> {
304 queue: Rc<BoundedQueue<T>>,
305}
306
307impl<T> Sender<T> {
308 /// Returns the number of currently queued elements.
309 pub fn len(&self) -> usize {
310 self.queue.len()
311 }
312
313 /// Returns the maximum buffer capacity of the channel.
314 ///
315 /// This is the capacity initially specified when [creating](channel) the
316 /// channel and remains constant.
317 pub fn max_capacity(&self) -> usize {
318 self.queue.max_capacity()
319 }
320
321 /// Returns the current capacity of the channel.
322 ///
323 /// The capacity goes down when sending a value and goes up when receiving a
324 /// value.
325 /// When the capacity is zero, any subsequent sends will only resolve once
326 /// sufficient capacity is available
327 pub fn capacity(&self) -> usize {
328 self.queue.capacity()
329 }
330
331 /// Returns `true` if the channel is closed.
332 pub fn is_closed(&self) -> bool {
333 self.queue.is_closed::<COUNTED>()
334 }
335
336 /// Returns `true` if the channel is empty.
337 pub fn is_empty(&self) -> bool {
338 self.queue.len() == 0
339 }
340
341 /// Returns `true` if `self` and `other` are handles for the same channel
342 /// instance.
343 pub fn same_channel(&self, other: &Self) -> bool {
344 core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
345 }
346
347 /// Sends a value through the channel if there is sufficient capacity.
348 ///
349 /// # Errors
350 ///
351 /// Fails, if the queue is closed or there is no available capacity.
352 pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
353 self.queue.try_send::<COUNTED>(elem)
354 }
355
356 /// Sends a value through the channel, potentially waiting until there is
357 /// sufficient capacity.
358 ///
359 /// # Errors
360 ///
361 /// Fails, if the queue is closed.
362 pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
363 self.queue.send::<COUNTED>(elem).await
364 }
365
366 /// Attempts to reserve a slot in the channel without blocking, if none are
367 /// available.
368 ///
369 /// The returned [`Permit`] can be used to immediately send a value to the
370 /// channel at a later point.
371 /// Dropping the permit without sending a value will return the capacity to
372 /// the channel.
373 ///
374 /// # Errors
375 ///
376 /// Fails, if there are no available permits or the channel has been closed.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use async_unsync::bounded;
382 ///
383 /// # async fn example_try_reserve() {
384 /// let (tx, mut rx) = bounded::channel(1).into_split();
385 ///
386 /// // reserve capacity, reducing available slots to 0
387 /// let permit = tx.try_reserve().unwrap();
388 /// assert!(tx.try_send(1).is_err());
389 /// assert!(tx.try_reserve().is_err());
390 ///
391 /// permit.send(1);
392 /// assert_eq!(rx.recv().await, Some(1));
393 /// # }
394 /// ```
395 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
396 self.queue.try_reserve::<COUNTED>()?;
397 Ok(Permit { queue: &self.queue })
398 }
399
400 /// Attempts to reserve a slot in the channel without blocking, if none are
401 /// available.
402 ///
403 /// This moves the sender *by value* and returns an *owned* permit that can
404 /// be used to immediately send a value to the channel at a later point.
405 /// Dropping the permit without sending a value will return the capacity to
406 /// the channel.
407 ///
408 /// # Errors
409 ///
410 /// Fails, if there are no available permits or the channel has been closed.
411 ///
412 /// # Examples
413 ///
414 /// ```
415 /// use async_unsync::bounded;
416 ///
417 /// # async fn example_try_reserve() {
418 /// let (tx, mut rx) = bounded::channel(2).into_split();
419 ///
420 /// // cloning senders is cheap, so arbitrary numbers of owned permits are
421 /// // easily created
422 /// let p1 = tx.clone().try_reserve_owned().unwrap();
423 /// let p2 = tx.clone().try_reserve_owned().unwrap();
424 ///
425 /// assert!(tx.try_send(1).is_err());
426 /// assert!(tx.try_reserve().is_err());
427 /// drop(tx);
428 ///
429 /// let _ = p2.send(1);
430 /// let _ = p1.send(2);
431 ///
432 /// assert_eq!(rx.recv().await, Some(1));
433 /// assert_eq!(rx.recv().await, Some(2));
434 /// assert_eq!(rx.recv().await, None);
435 /// # }
436 /// ```
437 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
438 if let Err(err) = self.queue.try_reserve::<COUNTED>() {
439 return Err(err.set(self));
440 }
441
442 Ok(OwnedPermit { sender: Some(self) })
443 }
444
445 /// Attempts to reserve a slot in the channel without blocking.
446 ///
447 /// If no capacity is available in the channel, this will block until a slot
448 /// becomes available.
449 /// The returned [`Permit`] can be used to immediately send a value to the
450 /// channel at a later point.
451 /// Dropping the permit without sending a value will return the capacity to
452 /// the channel.
453 ///
454 /// # Errors
455 ///
456 /// Fails, if there are no available permits or the channel has been closed.
457 ///
458 /// # Examples
459 ///
460 /// ```
461 /// use async_unsync::bounded;
462 ///
463 /// # async fn example_try_reserve() {
464 /// let (tx, mut rx) = bounded::channel(1).into_split();
465 ///
466 /// // reserve capacity, reducing available slots to 0
467 /// let permit = tx.reserve().await.unwrap();
468 /// assert!(tx.try_send(1).is_err());
469 /// assert!(tx.try_reserve().is_err());
470 ///
471 /// permit.send(1);
472 /// assert_eq!(rx.recv().await, Some(1));
473 /// # }
474 /// ```
475 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
476 self.queue.reserve::<COUNTED>().await?;
477 Ok(Permit { queue: &self.queue })
478 }
479
480 /// Attempts to reserve a slot in the channel without blocking.
481 ///
482 /// If no capacity is available in the channel, this will block until a slot
483 /// becomes available.
484 /// This moves the sender *by value* and returns an *owned* permit that can
485 /// be used to immediately send a value to the channel at a later point.
486 /// Dropping the permit without sending a value will return the capacity to
487 /// the channel.
488 ///
489 /// # Errors
490 ///
491 /// Fails, if there are no available permits or the channel has been closed.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// use async_unsync::bounded;
497 ///
498 /// # async fn example_try_reserve() {
499 /// let (tx, mut rx) = bounded::channel(1).into_split();
500 ///
501 /// // reserve capacity, reducing available slots to 0
502 /// let permit = tx.clone().reserve_owned().await.unwrap();
503 /// assert!(tx.try_send(1).is_err());
504 /// assert!(tx.try_reserve().is_err());
505 ///
506 /// permit.send(1);
507 /// assert_eq!(rx.recv().await, Some(1));
508 /// # }
509 /// ```
510 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<Self>> {
511 if self.queue.reserve::<COUNTED>().await.is_err() {
512 return Err(SendError(self));
513 }
514
515 Ok(OwnedPermit { sender: Some(self) })
516 }
517}
518
519impl<T> Clone for Sender<T> {
520 fn clone(&self) -> Self {
521 // SAFETY: no mutable or aliased access to queue possible
522 unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
523 Self { queue: Rc::clone(&self.queue) }
524 }
525}
526
527impl<T> Drop for Sender<T> {
528 fn drop(&mut self) {
529 // SAFETY: no mutable or aliased access to queue possible
530 unsafe { (*self.queue.0.get()).decrease_sender_count() };
531 }
532}
533
534impl<T> fmt::Debug for Sender<T> {
535 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536 f.debug_struct("Sender")
537 .field("capacity", &self.capacity())
538 .field("max_capacity", &self.max_capacity())
539 .field("is_closed", &self.is_closed())
540 .finish()
541 }
542}
543
544/// A borrowing handle for sending elements through a bounded split [`Channel`].
545pub struct SenderRef<'a, T> {
546 queue: &'a BoundedQueue<T>,
547}
548
549impl<T> SenderRef<'_, T> {
550 /// Returns the number of queued elements.
551 pub fn len(&self) -> usize {
552 self.queue.len()
553 }
554
555 /// Returns the maximum buffer capacity of the channel.
556 ///
557 /// This is the capacity initially specified when [creating](channel) the
558 /// channel and remains constant.
559 pub fn max_capacity(&self) -> usize {
560 self.queue.max_capacity()
561 }
562
563 /// Returns the current capacity of the channel.
564 ///
565 /// The capacity goes down when sending a value and goes up when receiving a
566 /// value.
567 /// When the capacity is zero, any subsequent sends will only resolve once
568 /// sufficient capacity is available
569 pub fn capacity(&self) -> usize {
570 self.queue.capacity()
571 }
572
573 /// Returns `true` if the channel is closed.
574 pub fn is_closed(&self) -> bool {
575 self.queue.is_closed::<COUNTED>()
576 }
577
578 /// Returns `true` if the channel is empty.
579 pub fn is_empty(&self) -> bool {
580 self.queue.len() == 0
581 }
582
583 /// Returns `true` if `self` and `other` are handles for the same channel
584 /// instance.
585 pub fn same_channel(&self, other: &Self) -> bool {
586 core::ptr::eq(&self.queue, &other.queue)
587 }
588
589 /// Sends a value through the channel if there is sufficient capacity.
590 ///
591 /// # Errors
592 ///
593 /// Fails, if the queue is closed or there is no available capacity.
594 pub fn try_send(&self, elem: T) -> Result<(), TrySendError<T>> {
595 self.queue.try_send::<COUNTED>(elem)
596 }
597
598 /// Sends a value through the channel, potentially blocking until there is
599 /// sufficient capacity.
600 ///
601 /// # Errors
602 ///
603 /// Fails, if the queue is closed.
604 pub async fn send(&self, elem: T) -> Result<(), SendError<T>> {
605 self.queue.send::<COUNTED>(elem).await
606 }
607
608 /// Attempts to reserve a slot in the channel without blocking, if none are
609 /// available.
610 ///
611 /// The returned [`Permit`] can be used to immediately send a value to the
612 /// channel at a later point.
613 /// Dropping the permit without sending a value will return the capacity to
614 /// the channel.
615 ///
616 /// # Errors
617 ///
618 /// Fails, if there are no available permits or the channel has been closed.
619 ///
620 /// # Examples
621 ///
622 /// ```
623 /// use async_unsync::bounded;
624 ///
625 /// # async fn example_try_reserve() {
626 /// let mut chan = bounded::channel(1);
627 /// let (tx, mut rx) = chan.split();
628 ///
629 /// // reserve capacity, reducing available slots to 0
630 /// let permit = tx.try_reserve().unwrap();
631 /// assert!(tx.try_send(1).is_err());
632 /// assert!(tx.try_reserve().is_err());
633 ///
634 /// permit.send(1);
635 /// assert_eq!(rx.recv().await, Some(1));
636 /// # }
637 /// ```
638 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
639 self.queue.try_reserve::<COUNTED>()?;
640 Ok(Permit { queue: self.queue })
641 }
642
643 /// Attempts to reserve a slot in the channel without blocking.
644 ///
645 /// If no capacity is available in the channel, this will block until a slot
646 /// becomes available.
647 /// The returned [`Permit`] can be used to immediately send a value to the
648 /// channel at a later point.
649 /// Dropping the permit without sending a value will return the capacity to
650 /// the channel.
651 ///
652 /// # Errors
653 ///
654 /// Fails, if there are no available permits or the channel has been closed.
655 ///
656 /// # Examples
657 ///
658 /// ```
659 /// use async_unsync::bounded;
660 ///
661 /// # async fn example_try_reserve() {
662 /// let mut chan = bounded::channel(1);
663 /// let (tx, mut rx) = chan.split();
664 ///
665 /// // reserve capacity, reducing available slots to 0
666 /// let permit = tx.reserve().await.unwrap();
667 /// assert!(tx.try_send(1).is_err());
668 /// assert!(tx.try_reserve().is_err());
669 ///
670 /// permit.send(1);
671 /// assert_eq!(rx.recv().await, Some(1));
672 /// # }
673 /// ```
674 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
675 self.queue.reserve::<COUNTED>().await?;
676 Ok(Permit { queue: self.queue })
677 }
678}
679
680impl<T> Clone for SenderRef<'_, T> {
681 fn clone(&self) -> Self {
682 // SAFETY: no mutable or aliased access to queue possible
683 unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
684 Self { queue: self.queue }
685 }
686}
687
688impl<T> Drop for SenderRef<'_, T> {
689 fn drop(&mut self) {
690 // SAFETY: no mutable or aliased access to queue possible
691 unsafe { (*self.queue.0.get()).decrease_sender_count() };
692 }
693}
694
695impl<T> fmt::Debug for SenderRef<'_, T> {
696 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697 f.debug_struct("SenderRef")
698 .field("capacity", &self.capacity())
699 .field("max_capacity", &self.max_capacity())
700 .field("is_closed", &self.is_closed())
701 .finish()
702 }
703}
704
705/// An owning handle for receiving elements through a split bounded [`Channel`].
706pub struct Receiver<T> {
707 queue: Rc<BoundedQueue<T>>,
708}
709
710impl<T> Receiver<T> {
711 /// Closes the channel, ensuring that all subsequent sends will fail.
712 pub fn close(&mut self) {
713 self.queue.close::<COUNTED>();
714 }
715
716 /// Returns the number of queued elements.
717 pub fn len(&self) -> usize {
718 self.queue.len()
719 }
720
721 /// Returns the maximum buffer capacity of the channel.
722 ///
723 /// This is the capacity initially specified when [creating](channel) the
724 /// channel and remains constant.
725 pub fn max_capacity(&self) -> usize {
726 self.queue.max_capacity()
727 }
728
729 /// Returns the current capacity of the channel.
730 ///
731 /// The capacity goes down when sending a value and goes up when receiving a
732 /// value.
733 /// When the capacity is zero, any subsequent sends will only resolve once
734 /// sufficient capacity is available
735 pub fn capacity(&self) -> usize {
736 self.queue.capacity()
737 }
738
739 /// Returns `true` if the channel is closed.
740 pub fn is_closed(&self) -> bool {
741 self.queue.is_closed::<COUNTED>()
742 }
743
744 /// Returns `true` if the channel is empty.
745 pub fn is_empty(&self) -> bool {
746 self.queue.len() == 0
747 }
748
749 /// Attempts to receive an element through the channel.
750 ///
751 /// # Errors
752 ///
753 /// Fails, if the channel is [empty](TryRecvError::Empty) or
754 /// [disconnected](TryRecvError::Disconnected).
755 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
756 self.queue.try_recv::<COUNTED>()
757 }
758
759 /// Polls the channel, resolving if an element was received or the channel
760 /// is closed but ignoring whether there are any remaining **Sender**(s) or
761 /// not.
762 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
763 self.queue.poll_recv::<COUNTED>(cx)
764 }
765
766 /// Receives an element through the channel.
767 ///
768 /// # Errors
769 ///
770 /// Fails, if the channel is [empty](TryRecvError::Empty) or
771 /// [disconnected](TryRecvError::Disconnected).
772 pub async fn recv(&mut self) -> Option<T> {
773 self.queue.recv::<COUNTED>().await
774 }
775}
776
777impl<T> Drop for Receiver<T> {
778 fn drop(&mut self) {
779 self.queue.close::<COUNTED>();
780 }
781}
782
783impl<T> fmt::Debug for Receiver<T> {
784 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
785 f.debug_struct("Receiver")
786 .field("capacity", &self.capacity())
787 .field("max_capacity", &self.max_capacity())
788 .field("is_closed", &self.is_closed())
789 .finish()
790 }
791}
792
793/// A borrowing handle for receiving elements through a split bounded [`Channel`].
794pub struct ReceiverRef<'a, T> {
795 queue: &'a BoundedQueue<T>,
796}
797
798impl<T> ReceiverRef<'_, T> {
799 /// Closes the channel, ensuring that all subsequent sends will fail.
800 pub fn close(&mut self) {
801 self.queue.close::<COUNTED>();
802 }
803
804 /// Returns the number of queued elements.
805 pub fn len(&self) -> usize {
806 self.queue.len()
807 }
808
809 /// Returns the maximum buffer capacity of the channel.
810 ///
811 /// This is the capacity initially specified when [creating](channel) the
812 /// channel and remains constant.
813 pub fn max_capacity(&self) -> usize {
814 self.queue.max_capacity()
815 }
816
817 /// Returns the current capacity of the channel.
818 ///
819 /// The capacity goes down when sending a value and goes up when receiving a
820 /// value.
821 /// When the capacity is zero, any subsequent sends will only resolve once
822 /// sufficient capacity is available
823 pub fn capacity(&self) -> usize {
824 self.queue.capacity()
825 }
826
827 /// Returns `true` if the channel is closed.
828 pub fn is_closed(&self) -> bool {
829 self.queue.is_closed::<COUNTED>()
830 }
831
832 /// Returns `true` if the channel is empty.
833 pub fn is_empty(&self) -> bool {
834 self.queue.len() == 0
835 }
836
837 /// Receives an element through the channel.
838 ///
839 /// # Errors
840 ///
841 /// Fails, if the channel is [empty](TryRecvError::Empty) or
842 /// [disconnected](TryRecvError::Disconnected).
843 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
844 self.queue.try_recv::<COUNTED>()
845 }
846
847 /// Polls the channel, resolving if an element was received or the channel
848 /// is closed, ignoring whether there are any remaining **Sender**(s) or
849 /// not.
850 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
851 self.queue.poll_recv::<COUNTED>(cx)
852 }
853
854 /// Receives an element through the channel.
855 ///
856 /// # Errors
857 ///
858 /// Fails, if the channel is closed (i.e., all senders have been dropped).
859 pub async fn recv(&mut self) -> Option<T> {
860 self.queue.recv::<COUNTED>().await
861 }
862}
863
864impl<T> Drop for ReceiverRef<'_, T> {
865 fn drop(&mut self) {
866 self.queue.close::<COUNTED>();
867 }
868}
869
870impl<T> fmt::Debug for ReceiverRef<'_, T> {
871 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
872 f.debug_struct("ReceiverRef")
873 .field("capacity", &self.capacity())
874 .field("max_capacity", &self.max_capacity())
875 .field("is_closed", &self.is_closed())
876 .finish()
877 }
878}
879
880/// A borrowing permit to send one value into the channel.
881pub struct Permit<'a, T> {
882 queue: &'a BoundedQueue<T>,
883}
884
885impl<T> Permit<'_, T> {
886 /// Sends a value using the reserved capacity.
887 ///
888 /// Since the capacity has been reserved beforehand, the value is sent
889 /// immediately and the permit is consumed.
890 /// This will succeed even if the channel has been closed.
891 pub fn send(self, elem: T) {
892 self.queue.unbounded_send(elem);
893 self.queue.unreserve(true);
894 mem::forget(self);
895 }
896}
897
898impl<T> Drop for Permit<'_, T> {
899 fn drop(&mut self) {
900 self.queue.unreserve(false);
901 }
902}
903
904impl<T> fmt::Debug for Permit<'_, T> {
905 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
906 f.debug_struct("Permit").finish_non_exhaustive()
907 }
908}
909
910/// An owned permit to send one value into the channel.
911pub struct OwnedPermit<T> {
912 sender: Option<Sender<T>>,
913}
914
915impl<T> OwnedPermit<T> {
916 /// Sends a value using the reserved capacity.
917 ///
918 /// Since the capacity has been reserved beforehand, the value is sent
919 /// immediately and the permit is consumed.
920 /// This will succeed even if the channel has been closed.
921 ///
922 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
923 /// the [`OwnedPermit`] was reserved.
924 pub fn send(mut self, elem: T) -> Sender<T> {
925 let sender = self.sender.take().unwrap_or_else(|| unreachable!());
926 sender.queue.unbounded_send(elem);
927 sender.queue.unreserve(true);
928 sender
929 }
930}
931
932impl<T> Drop for OwnedPermit<T> {
933 fn drop(&mut self) {
934 if let Some(sender) = self.sender.take() {
935 sender.queue.unreserve(false);
936 }
937 }
938}
939
940impl<T> fmt::Debug for OwnedPermit<T> {
941 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
942 f.debug_struct("OwnedPermit").finish_non_exhaustive()
943 }
944}
945
946#[cfg(test)]
947mod tests {
948 use core::{future::Future as _, task::Poll};
949
950 use futures_lite::future;
951
952 use crate::queue::RecvFuture;
953
954 #[test]
955 #[should_panic]
956 fn channel_panic() {
957 let _ = super::channel::<i32>(0);
958 }
959
960 #[test]
961 fn recv_split() {
962 future::block_on(async {
963 let mut chan = super::channel::<i32>(4);
964 let (tx, mut rx) = chan.split();
965 assert_eq!(tx.capacity(), 4);
966
967 for i in 0..4 {
968 assert!(tx.send(i).await.is_ok());
969 assert_eq!(tx.capacity(), 4 - i as usize - 1);
970 }
971
972 assert_eq!(rx.recv().await, Some(0));
973 assert_eq!(tx.capacity(), 1);
974 assert_eq!(rx.recv().await, Some(1));
975 assert_eq!(tx.capacity(), 2);
976 assert_eq!(rx.recv().await, Some(2));
977 assert_eq!(tx.capacity(), 3);
978 assert_eq!(rx.recv().await, Some(3));
979 assert_eq!(tx.capacity(), 4);
980
981 assert!(rx.try_recv().is_err());
982 drop(rx);
983
984 assert!(tx.send(0).await.is_err());
985 });
986 }
987
988 #[test]
989 fn poll_often() {
990 future::block_on(async {
991 let mut chan = super::channel::<i32>(4);
992 let (tx, rx) = chan.split();
993
994 for i in 0..4 {
995 assert!(tx.send(i).await.is_ok());
996 }
997
998 let queue = &rx.queue.0;
999 let fut = RecvFuture::<'_, _, _, true> { queue };
1000 futures_lite::pin!(fut);
1001
1002 assert_eq!((&mut fut).await, Some(0));
1003 assert_eq!((&mut fut).await, Some(1));
1004 assert_eq!((&mut fut).await, Some(2));
1005 assert_eq!((&mut fut).await, Some(3));
1006 });
1007 }
1008
1009 #[test]
1010 fn cancel_recv() {
1011 future::block_on(async {
1012 let mut chan = super::channel::<i32>(1);
1013 let (tx, mut rx) = chan.split();
1014 assert_eq!(tx.capacity(), 1);
1015
1016 let mut r1 = Box::pin(rx.recv());
1017 core::future::poll_fn(|cx| {
1018 assert!(r1.as_mut().poll(cx).is_pending());
1019 Poll::Ready(())
1020 })
1021 .await;
1022
1023 // allow the r1 future to resolve
1024 tx.send(0).await.unwrap();
1025 assert_eq!(tx.capacity(), 0);
1026 drop(r1);
1027 assert_eq!(tx.capacity(), 0);
1028 assert_eq!(rx.recv().await, Some(0));
1029 });
1030 }
1031
1032 #[test]
1033 fn cancel_send() {
1034 future::block_on(async {
1035 let mut chan = super::channel::<i32>(1);
1036 let (tx, mut rx) = chan.split();
1037 assert_eq!(tx.capacity(), 1);
1038
1039 // fill the queue to capacity
1040 tx.send(0).await.unwrap();
1041 assert_eq!(tx.capacity(), 0);
1042
1043 let mut s1 = Box::pin(tx.send(1));
1044 let mut s2 = Box::pin(tx.send(2));
1045 core::future::poll_fn(|cx| {
1046 assert!(s1.as_mut().poll(cx).is_pending());
1047 assert!(s2.as_mut().poll(cx).is_pending());
1048 Poll::Ready(())
1049 })
1050 .await;
1051
1052 // receive the value in the channel, then drop the s1 future
1053 assert_eq!(rx.recv().await, Some(0));
1054 drop(s1);
1055 assert_eq!(tx.capacity(), 0);
1056 core::future::poll_fn(|cx| {
1057 assert!(s2.as_mut().poll(cx).is_ready());
1058 assert_eq!(tx.capacity(), 0);
1059 Poll::Ready(())
1060 })
1061 .await;
1062
1063 assert_eq!(rx.recv().await, Some(2));
1064 assert_eq!(tx.capacity(), 1);
1065 tx.send(1).await.unwrap();
1066 assert_eq!(rx.recv().await, Some(1));
1067 });
1068 }
1069
1070 #[test]
1071 fn poll_out_of_order() {
1072 future::block_on(async {
1073 let mut chan = super::channel::<i32>(1);
1074 let (tx, mut rx) = chan.split();
1075 assert_eq!(tx.capacity(), 1);
1076
1077 // fill the queue to capacity
1078 tx.send(0).await.unwrap();
1079 assert_eq!(tx.capacity(), 0);
1080
1081 let s1 = tx.send(1);
1082 let s2 = tx.send(2);
1083 futures_lite::pin!(s1, s2);
1084
1085 // poll both send futures once in order to register them, both
1086 // should return pending
1087 core::future::poll_fn(|cx| {
1088 assert!(s1.as_mut().poll(cx).is_pending());
1089 assert!(s2.as_mut().poll(cx).is_pending());
1090 assert_eq!(tx.capacity(), 0);
1091
1092 Poll::Ready(())
1093 })
1094 .await;
1095
1096 // make room in the queue
1097 assert_eq!(rx.recv().await, Some(0));
1098 assert_eq!(tx.capacity(), 0);
1099
1100 // polling the second send first should still return pending, even
1101 // though there is room in the queue, because the order has been
1102 // established when the futures were first registered
1103 core::future::poll_fn(|cx| {
1104 assert!(s2.as_mut().poll(cx).is_pending());
1105 assert_eq!(s1.as_mut().poll(cx), Poll::Ready(Ok(())));
1106 assert!(s2.as_mut().poll(cx).is_pending());
1107
1108 Poll::Ready(())
1109 })
1110 .await;
1111
1112 // make room in the queue
1113 assert_eq!(rx.recv().await, Some(1));
1114 assert!(s2.await.is_ok());
1115 assert_eq!(rx.recv().await, Some(2));
1116 });
1117 }
1118
1119 #[test]
1120 fn poll_out_of_order_drop() {
1121 future::block_on(async {
1122 let mut chan = super::channel::<i32>(1);
1123 let (tx, mut rx) = chan.split();
1124
1125 // fill the queue to capacity
1126 tx.send(0).await.unwrap();
1127
1128 let mut s1 = Box::pin(tx.send(1));
1129 let mut s2 = Box::pin(tx.send(2));
1130
1131 // poll both send futures once in order to register them, both
1132 // should return pending
1133 core::future::poll_fn(|cx| {
1134 assert!(s1.as_mut().poll(cx).is_pending());
1135 assert!(s2.as_mut().poll(cx).is_pending());
1136
1137 Poll::Ready(())
1138 })
1139 .await;
1140
1141 // make room in the queue by recving once
1142 assert_eq!(rx.recv().await, Some(0));
1143 // drop the first send before polling again, the second one should
1144 // still be able to proceed
1145 drop(s1);
1146
1147 core::future::poll_fn(|cx| {
1148 assert_eq!(s2.as_mut().poll(cx), Poll::Ready(Ok(())));
1149 Poll::Ready(())
1150 })
1151 .await;
1152
1153 assert_eq!(rx.try_recv(), Ok(2));
1154 });
1155 }
1156
1157 #[test]
1158 fn full() {
1159 let chan = super::channel::<i32>(1);
1160
1161 assert!(chan.try_send(0).is_ok());
1162 assert!(chan.try_send(1).is_err());
1163
1164 assert_eq!(chan.try_recv(), Ok(0));
1165 assert!(chan.try_send(1).is_ok());
1166 assert_eq!(chan.try_recv(), Ok(1));
1167 }
1168
1169 #[test]
1170 fn reserve_and_close() {
1171 future::block_on(async {
1172 let mut chan = super::channel::<i32>(1);
1173 let (tx, mut rx) = chan.split();
1174
1175 let permit = tx.reserve().await.unwrap();
1176 assert_eq!(tx.capacity(), 0);
1177 assert_eq!(tx.max_capacity(), 1);
1178
1179 rx.close();
1180 assert!(tx.reserve().await.is_err());
1181
1182 core::future::poll_fn(|cx| {
1183 assert!(rx.poll_recv(cx).is_pending());
1184 Poll::Ready(())
1185 })
1186 .await;
1187
1188 assert!(tx.send(1).await.is_err());
1189 permit.send(1);
1190 assert_eq!(tx.capacity(), 0);
1191
1192 assert_eq!(rx.recv().await, Some(1));
1193 assert_eq!(tx.capacity(), 1);
1194 assert_eq!(rx.recv().await, None);
1195 });
1196 }
1197
1198 #[test]
1199 fn reserve_and_cancel() {
1200 future::block_on(async {
1201 let mut chan = super::channel::<i32>(1);
1202 let (tx, mut rx) = chan.split();
1203
1204 let permit = tx.reserve().await.unwrap();
1205 assert_eq!(tx.capacity(), 0);
1206 assert_eq!(tx.max_capacity(), 1);
1207
1208 let mut fut = Box::pin(tx.reserve());
1209 core::future::poll_fn(|cx| {
1210 assert!(fut.as_mut().poll(cx).is_pending());
1211 Poll::Ready(())
1212 })
1213 .await;
1214
1215 drop(permit);
1216 let permit = fut.await.unwrap();
1217
1218 rx.close();
1219 assert!(tx.reserve().await.is_err());
1220
1221 assert!(tx.send(1).await.is_err());
1222 permit.send(1);
1223
1224 assert_eq!(rx.recv().await, Some(1));
1225 assert_eq!(rx.recv().await, None);
1226 });
1227 }
1228
1229 #[test]
1230 fn reserve_and_drop_permit() {
1231 future::block_on(async {
1232 let mut chan = super::channel::<i32>(1);
1233 let (tx, mut rx) = chan.split();
1234
1235 let permit = tx.reserve().await.unwrap();
1236 assert_eq!(tx.capacity(), 0);
1237 assert_eq!(tx.max_capacity(), 1);
1238
1239 rx.close();
1240 core::future::poll_fn(|cx| {
1241 assert!(rx.poll_recv(cx).is_pending());
1242 Poll::Ready(())
1243 })
1244 .await;
1245
1246 drop(permit);
1247 assert_eq!(tx.capacity(), 1);
1248 assert_eq!(rx.recv().await, None);
1249 });
1250 }
1251
1252 #[test]
1253 fn diverting_len_and_capacity() {
1254 future::block_on(async {
1255 let mut chan = super::channel(5);
1256 let (tx, mut rx) = chan.split();
1257
1258 tx.send(1).await.unwrap();
1259 let permit1 = tx.reserve().await.unwrap();
1260 assert_eq!(tx.len() + tx.capacity(), 4);
1261
1262 let permit2 = tx.reserve().await.unwrap();
1263 assert_eq!(tx.len() + tx.capacity(), 3);
1264
1265 permit1.send(2);
1266 permit2.send(3);
1267
1268 // now it is equal again
1269 assert_eq!(tx.len() + tx.capacity(), 5);
1270
1271 assert_eq!(rx.recv().await, Some(1));
1272 assert_eq!(rx.recv().await, Some(2));
1273 assert_eq!(rx.recv().await, Some(3));
1274 });
1275 }
1276
1277 #[test]
1278 fn split_after_close() {
1279 let mut chan = super::channel::<i32>(1);
1280 chan.close();
1281
1282 let (tx, rx) = chan.split();
1283 assert!(tx.is_closed());
1284 assert!(rx.is_closed());
1285 }
1286
1287 #[test]
1288 fn from_iter_less() {
1289 let chan = super::channel_from_iter(0, &[0, 1, 2, 3]);
1290 assert_eq!(chan.capacity(), 0);
1291 }
1292
1293 #[test]
1294 fn from_iter_more() {
1295 future::block_on(async {
1296 let chan = super::Channel::from_iter(5, [0, 1, 2, 3]);
1297 assert_eq!(chan.recv().await, Some(0));
1298 assert_eq!(chan.recv().await, Some(1));
1299 assert_eq!(chan.recv().await, Some(2));
1300 assert_eq!(chan.recv().await, Some(3));
1301 assert_eq!(chan.capacity(), 5);
1302 });
1303 }
1304
1305 #[test]
1306 fn send_vs_reserve() {
1307 future::block_on(async {
1308 let mut chan = super::channel::<i32>(1);
1309 let (tx, mut rx) = chan.split();
1310
1311 assert!(tx.send(-1).await.is_ok());
1312
1313 let mut f1 = Box::pin(tx.send(-2));
1314 let mut f2 = Box::pin(tx.send(-3));
1315 let mut f3 = Box::pin(tx.reserve());
1316
1317 core::future::poll_fn(|cx| {
1318 assert!(f1.as_mut().poll(cx).is_pending());
1319 assert!(f2.as_mut().poll(cx).is_pending());
1320 assert!(f3.as_mut().poll(cx).is_pending());
1321 Poll::Ready(())
1322 })
1323 .await;
1324
1325 assert_eq!(rx.recv().await, Some(-1));
1326 assert_eq!(tx.capacity(), 0, "capacity goes to f1");
1327
1328 assert!(f1.await.is_ok());
1329 assert_eq!(tx.capacity(), 0);
1330
1331 assert_eq!(rx.recv().await, Some(-2));
1332 assert_eq!(tx.capacity(), 0, "capacity goes to f3");
1333
1334 drop(f2);
1335 assert_eq!(tx.capacity(), 0, "capacity goes to f3");
1336
1337 f3.await.unwrap().send(-4);
1338 assert_eq!(tx.capacity(), 0);
1339
1340 assert_eq!(rx.recv().await, Some(-4));
1341 assert_eq!(tx.capacity(), 1);
1342 });
1343 }
1344}