async_unsync/
unbounded.rs

1//! An **unbounded** MPSC channel implementation.
2
3use core::{
4    fmt,
5    task::{Context, Poll},
6};
7
8use crate::{
9    alloc::{collections::VecDeque, rc::Rc},
10    error::{SendError, TryRecvError},
11    mask::{COUNTED, UNCOUNTED},
12    queue::UnboundedQueue,
13};
14
15/// Returns a new unbounded channel.
16pub const fn channel<T>() -> UnboundedChannel<T> {
17    UnboundedChannel { queue: UnboundedQueue::new() }
18}
19
20/// Returns a new unbounded channel with pre-queued elements.
21pub fn channel_from_iter<T>(iter: impl IntoIterator<Item = T>) -> UnboundedChannel<T> {
22    UnboundedChannel::from_iter(iter)
23}
24
25/// An unsynchronized (`!Sync`), asynchronous and unbounded channel.
26pub struct UnboundedChannel<T> {
27    queue: UnboundedQueue<T>,
28}
29
30impl<T> FromIterator<T> for UnboundedChannel<T> {
31    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
32        Self { queue: UnboundedQueue::from_iter(iter) }
33    }
34}
35
36impl<T> UnboundedChannel<T> {
37    /// Returns a new unbounded channel with pre-allocated initial capacity.
38    pub fn with_initial_capacity(initial: usize) -> Self {
39        Self { queue: UnboundedQueue::with_capacity(initial) }
40    }
41
42    /// Splits the channel into borrowing [`UnboundedSenderRef`] and
43    /// [`UnboundedReceiverRef`] handles.
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// use async_unsync::unbounded;
49    ///
50    /// // must use a non-temporary binding for the channel
51    /// let mut chan = unbounded::channel();
52    /// let (tx, mut rx) = chan.split();
53    /// tx.send(1).unwrap();
54    ///
55    /// // dropping the handles will close the channel
56    /// drop((tx, rx));
57    /// assert!(chan.is_closed());
58    ///
59    /// // ...but the queued value can still be received
60    /// assert_eq!(chan.try_recv(), Ok(1));
61    /// assert!(chan.try_recv().is_err());
62    /// ```
63    pub fn split(&mut self) -> (UnboundedSenderRef<'_, T>, UnboundedReceiverRef<'_, T>) {
64        self.queue.0.get_mut().set_counted();
65        (UnboundedSenderRef { queue: &self.queue }, UnboundedReceiverRef { queue: &self.queue })
66    }
67
68    /// Splits the channel into owning [`UnboundedSender`] and
69    /// [`UnboundedReceiver`] handles.
70    ///
71    /// This requires one additional allocation over
72    /// [`split`](UnboundedChannel::split), but avoids potential lifetime
73    /// restrictions, since the returned handles are valid for the `'static`
74    /// lifetime, meaning they can be used in spawned (local) tasks.
75    pub fn into_split(mut self) -> (UnboundedSender<T>, UnboundedReceiver<T>) {
76        self.queue.0.get_mut().set_counted();
77        let queue = Rc::new(self.queue);
78        (UnboundedSender { queue: Rc::clone(&queue) }, UnboundedReceiver { queue })
79    }
80
81    /// Converts into the underlying [`VecDeque`] container.
82    pub fn into_deque(self) -> VecDeque<T> {
83        self.queue.into_deque()
84    }
85
86    /// Returns the number of queued elements.
87    pub fn len(&self) -> usize {
88        self.queue.len()
89    }
90
91    /// Closes the channel, ensuring that all subsequent sends will fail.
92    #[cold]
93    pub fn close(&self) {
94        self.queue.close::<UNCOUNTED>();
95    }
96
97    /// Returns `true` if the channel is closed.
98    pub fn is_closed(&self) -> bool {
99        self.queue.is_closed::<UNCOUNTED>()
100    }
101
102    /// Returns `true` if the channel is empty.
103    pub fn is_empty(&self) -> bool {
104        self.len() == 0
105    }
106
107    /// Receives an element through the channel.
108    ///
109    /// # Errors
110    ///
111    /// Fails, if the channel is [empty](TryRecvError::Empty) or
112    /// [disconnected](TryRecvError::Disconnected).
113    pub fn try_recv(&self) -> Result<T, TryRecvError> {
114        self.queue.try_recv::<UNCOUNTED>()
115    }
116
117    /// Polls the channel, resolving if an element was received or the channel
118    /// is closed, ignoring whether there are any remaining **Sender**(s) or
119    /// not.
120    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
121        self.queue.poll_recv::<UNCOUNTED>(cx)
122    }
123
124    /// Receives an element through the channel.
125    ///
126    /// # Errors
127    ///
128    /// Fails, if the channel is closed (i.e., all senders have been dropped).
129    pub async fn recv(&self) -> Option<T> {
130        self.queue.recv::<UNCOUNTED>().await
131    }
132
133    /// Sends a value through the channel.
134    ///
135    /// # Errors
136    ///
137    /// Fails, if the queue is closed.
138    pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
139        self.queue.send::<UNCOUNTED>(elem)
140    }
141}
142
143impl<T> fmt::Debug for UnboundedChannel<T> {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.debug_struct("UnboundedChannel")
146            .field("len", &self.len())
147            .field("is_closed", &self.is_closed())
148            .finish()
149    }
150}
151
152/// An owned handle for sending elements through an unbounded split
153/// [`UnboundedChannel`].
154pub struct UnboundedSender<T> {
155    queue: Rc<UnboundedQueue<T>>,
156}
157
158impl<T> UnboundedSender<T> {
159    /// Returns the number of currently queued elements.
160    pub fn len(&self) -> usize {
161        self.queue.len()
162    }
163
164    /// Returns `true` if the channel is closed.
165    pub fn is_closed(&self) -> bool {
166        self.queue.is_closed::<COUNTED>()
167    }
168
169    /// Returns `true` if the channel is empty.
170    pub fn is_empty(&self) -> bool {
171        self.queue.len() == 0
172    }
173
174    /// Returns `true` if `self` and `other` are handles for the same channel
175    /// instance.
176    pub fn same_channel(&self, other: &Self) -> bool {
177        core::ptr::eq(Rc::as_ptr(&self.queue), Rc::as_ptr(&other.queue))
178    }
179
180    /// Sends a value through the channel.
181    ///
182    /// # Errors
183    ///
184    /// Fails, if the queue is closed.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use async_unsync::unbounded;
190    ///
191    /// let (tx, mut rx) = unbounded::channel().into_split();
192    /// tx.send(1).unwrap();
193    /// assert_eq!(rx.try_recv(), Ok(1));
194    /// ```
195    pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
196        self.queue.send::<COUNTED>(elem)
197    }
198}
199
200impl<T> Clone for UnboundedSender<T> {
201    fn clone(&self) -> Self {
202        // SAFETY: no mutable or aliased access to queue possible
203        unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
204        Self { queue: Rc::clone(&self.queue) }
205    }
206}
207
208impl<T> Drop for UnboundedSender<T> {
209    fn drop(&mut self) {
210        // SAFETY: no mutable or aliased access to queue possible
211        unsafe { (*self.queue.0.get()).decrease_sender_count() };
212    }
213}
214
215impl<T> fmt::Debug for UnboundedSender<T> {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        f.debug_struct("UnboundedSender")
218            .field("len", &self.len())
219            .field("is_closed", &self.is_closed())
220            .finish()
221    }
222}
223
224/// A borrowing handle for sending elements through an unbounded split
225/// [`UnboundedChannel`].
226pub struct UnboundedSenderRef<'a, T> {
227    queue: &'a UnboundedQueue<T>,
228}
229
230impl<T> UnboundedSenderRef<'_, T> {
231    /// Returns the number of queued elements.
232    pub fn len(&self) -> usize {
233        self.queue.len()
234    }
235
236    /// Returns `true` if the channel is closed.
237    pub fn is_closed(&self) -> bool {
238        self.queue.is_closed::<COUNTED>()
239    }
240
241    /// Returns `true` if the channel is empty.
242    pub fn is_empty(&self) -> bool {
243        self.queue.len() == 0
244    }
245
246    /// Returns `true` if `self` and `other` are handles for the same channel
247    /// instance.
248    pub fn same_channel(&self, other: &Self) -> bool {
249        core::ptr::eq(&self.queue, &other.queue)
250    }
251
252    /// Sends a value through the channel.
253    ///
254    /// # Errors
255    ///
256    /// Fails, if the queue is closed.
257    ///
258    /// # Examples
259    ///
260    /// ```
261    /// use async_unsync::unbounded;
262    ///
263    /// let mut chan = unbounded::channel();
264    /// let (tx, mut rx) = chan.split();
265    /// tx.send(1).unwrap();
266    /// assert_eq!(rx.try_recv(), Ok(1));
267    /// ```
268    pub fn send(&self, elem: T) -> Result<(), SendError<T>> {
269        self.queue.send::<COUNTED>(elem)
270    }
271}
272
273impl<T> Clone for UnboundedSenderRef<'_, T> {
274    fn clone(&self) -> Self {
275        // SAFETY: no mutable or aliased access to queue possible
276        unsafe { (*self.queue.0.get()).mask.increase_sender_count() };
277        Self { queue: self.queue }
278    }
279}
280
281impl<T> Drop for UnboundedSenderRef<'_, T> {
282    fn drop(&mut self) {
283        // SAFETY: no mutable or aliased access to queue possible
284        unsafe { (*self.queue.0.get()).decrease_sender_count() };
285    }
286}
287
288impl<T> fmt::Debug for UnboundedSenderRef<'_, T> {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        f.debug_struct("UnboundedSenderRef")
291            .field("len", &self.len())
292            .field("is_closed", &self.is_closed())
293            .finish()
294    }
295}
296
297/// An owning handle for receiving elements through a split unbounded
298/// [`UnboundedChannel`].
299pub struct UnboundedReceiver<T> {
300    queue: Rc<UnboundedQueue<T>>,
301}
302
303impl<T> UnboundedReceiver<T> {
304    /// Closes the channel, ensuring that all subsequent sends will fail.
305    #[cold]
306    pub fn close(&mut self) {
307        self.queue.close::<COUNTED>();
308    }
309
310    /// Returns the number of queued elements.
311    pub fn len(&self) -> usize {
312        self.queue.len()
313    }
314
315    /// Returns `true` if the channel is closed.
316    pub fn is_closed(&self) -> bool {
317        self.queue.is_closed::<COUNTED>()
318    }
319
320    /// Returns `true` if the channel is empty.
321    pub fn is_empty(&self) -> bool {
322        self.queue.len() == 0
323    }
324
325    /// Attempts to receive an element through the channel.
326    ///
327    /// # Errors
328    ///
329    /// Fails, if the channel is [empty](TryRecvError::Empty) or
330    /// [disconnected](TryRecvError::Disconnected).
331    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
332        self.queue.try_recv::<COUNTED>()
333    }
334
335    /// Polls the channel, resolving if an element was received or the channel
336    /// is closed but ignoring whether there are any remaining **Sender**(s) or
337    /// not.
338    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
339        self.queue.poll_recv::<COUNTED>(cx)
340    }
341
342    /// Receives an element through the channel.
343    ///
344    /// # Errors
345    ///
346    /// Fails, if the channel is [empty](TryRecvError::Empty) or
347    /// [disconnected](TryRecvError::Disconnected).
348    pub async fn recv(&mut self) -> Option<T> {
349        self.queue.recv::<COUNTED>().await
350    }
351}
352
353impl<T> Drop for UnboundedReceiver<T> {
354    fn drop(&mut self) {
355        self.queue.close::<COUNTED>();
356    }
357}
358
359impl<T> fmt::Debug for UnboundedReceiver<T> {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        f.debug_struct("UnboundedReceiver")
362            .field("len", &self.len())
363            .field("is_closed", &self.is_closed())
364            .finish()
365    }
366}
367
368/// A borrowing handle for receiving elements through a split unbounded
369/// [`UnboundedChannel`].
370pub struct UnboundedReceiverRef<'a, T> {
371    queue: &'a UnboundedQueue<T>,
372}
373
374impl<T> UnboundedReceiverRef<'_, T> {
375    /// Closes the channel, ensuring that all subsequent sends will fail.
376    #[cold]
377    pub fn close(&mut self) {
378        self.queue.close::<COUNTED>();
379    }
380
381    /// Returns the number of queued elements.
382    pub fn len(&self) -> usize {
383        self.queue.len()
384    }
385
386    /// Returns `true` if the channel is closed.
387    pub fn is_closed(&self) -> bool {
388        self.queue.is_closed::<COUNTED>()
389    }
390
391    /// Returns `true` if the channel is empty.
392    pub fn is_empty(&self) -> bool {
393        self.queue.len() == 0
394    }
395
396    /// Receives an element through the channel.
397    ///
398    /// # Errors
399    ///
400    /// Fails, if the channel is empty or disconnected.
401    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
402        self.queue.try_recv::<COUNTED>()
403    }
404
405    /// Polls the channel, resolving if an element was received or the channel
406    /// is closed, ignoring whether there are any remaining **Sender**(s) or
407    /// not.
408    pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
409        self.queue.poll_recv::<COUNTED>(cx)
410    }
411
412    /// Receives an element through the channel.
413    ///
414    /// # Errors
415    ///
416    /// Fails, if the channel is closed (i.e., all senders have been dropped).
417    pub async fn recv(&mut self) -> Option<T> {
418        self.queue.recv::<COUNTED>().await
419    }
420}
421
422impl<T> Drop for UnboundedReceiverRef<'_, T> {
423    fn drop(&mut self) {
424        self.queue.close::<COUNTED>();
425    }
426}
427
428impl<T> fmt::Debug for UnboundedReceiverRef<'_, T> {
429    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430        f.debug_struct("UnboundedReceiverRef")
431            .field("len", &self.len())
432            .field("is_closed", &self.is_closed())
433            .finish()
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use core::{future::Future as _, task::Poll};
440
441    #[test]
442    fn try_recv() {
443        let chan = super::channel::<i32>();
444        assert!(chan.try_recv().is_err());
445        chan.send(-1).unwrap();
446        assert_eq!(chan.try_recv(), Ok(-1));
447    }
448
449    #[test]
450    fn try_recv_split() {
451        let mut chan = super::channel::<i32>();
452        let (tx, mut rx) = chan.split();
453
454        assert!(rx.try_recv().is_err());
455        tx.send(-1).unwrap();
456        assert_eq!(rx.try_recv(), Ok(-1));
457    }
458
459    #[test]
460    fn try_recv_closed() {
461        let chan = super::channel::<i32>();
462
463        assert!(chan.try_recv().is_err());
464        chan.send(-1).unwrap();
465        chan.close();
466
467        assert_eq!(chan.try_recv(), Ok(-1));
468        assert!(chan.try_recv().unwrap_err().is_disconnected());
469    }
470
471    #[test]
472    fn try_recv_closed_split() {
473        let mut chan = super::channel::<i32>();
474        let (tx, mut rx) = chan.split();
475
476        assert!(rx.try_recv().is_err());
477        tx.send(-1).unwrap();
478        drop(tx);
479
480        assert_eq!(rx.try_recv(), Ok(-1));
481        assert!(rx.try_recv().unwrap_err().is_disconnected());
482    }
483
484    #[test]
485    fn send_split() {
486        let mut chan = super::channel::<i32>();
487        let (tx, mut rx) = chan.split();
488
489        for i in 0..4 {
490            let _ = tx.send(i);
491        }
492
493        assert_eq!(rx.try_recv(), Ok(0));
494        assert_eq!(rx.try_recv(), Ok(1));
495        assert_eq!(rx.try_recv(), Ok(2));
496        assert_eq!(rx.try_recv(), Ok(3));
497    }
498
499    #[test]
500    fn send_closed() {
501        let chan = super::channel::<i32>();
502        chan.close();
503        assert!(chan.send(-1).is_err());
504    }
505
506    #[test]
507    fn send_closed_split() {
508        let mut chan = super::channel::<i32>();
509        let (tx, _) = chan.split();
510
511        assert!(tx.send(-1).is_err());
512    }
513    #[test]
514    fn recv() {
515        futures_lite::future::block_on(async {
516            let chan = super::channel::<i32>();
517
518            chan.send(-1).unwrap();
519            assert_eq!(chan.recv().await, Some(-1));
520            chan.send(-2).unwrap();
521            assert_eq!(chan.recv().await, Some(-2));
522            chan.close();
523            chan.send(-3).unwrap_err();
524        });
525    }
526
527    #[test]
528    fn recv_split() {
529        futures_lite::future::block_on(async {
530            let mut chan = super::channel::<i32>();
531            let (tx, mut rx) = chan.split();
532
533            tx.send(-1).unwrap();
534            assert_eq!(rx.recv().await, Some(-1));
535            tx.send(-2).unwrap();
536            assert_eq!(rx.recv().await, Some(-2));
537            drop(rx);
538            tx.send(-3).unwrap_err();
539        });
540    }
541
542    #[test]
543    fn recv_closed_split() {
544        futures_lite::future::block_on(async {
545            let mut chan = super::channel::<i32>();
546            let (tx, mut rx) = chan.split();
547
548            tx.send(-1).unwrap();
549            tx.send(-2).unwrap();
550            tx.send(-3).unwrap();
551
552            rx.close();
553            assert!(tx.send(-4).is_err());
554
555            assert_eq!(rx.recv().await, Some(-1));
556            assert_eq!(rx.recv().await, Some(-2));
557            assert_eq!(rx.recv().await, Some(-3));
558            assert_eq!(rx.recv().await, None);
559
560            assert!(tx.send(-4).is_err());
561        });
562    }
563
564    #[test]
565    fn poll_recv() {
566        futures_lite::future::block_on(async {
567            let chan = super::channel::<i32>();
568            core::future::poll_fn(|cx| {
569                assert!(chan.poll_recv(cx).is_pending());
570                assert!(chan.poll_recv(cx).is_pending());
571
572                chan.send(1).unwrap();
573                assert_eq!(chan.poll_recv(cx), Poll::Ready(Some(1)));
574
575                Poll::Ready(())
576            })
577            .await;
578        });
579    }
580
581    #[test]
582    fn multiple_recv() {
583        futures_lite::future::block_on(async {
584            let chan = super::channel::<i32>();
585            let mut recv1 = Box::pin(chan.recv());
586            let mut recv2 = Box::pin(chan.recv());
587
588            core::future::poll_fn(|cx| {
589                // first poll registers the recv1 future to be woken
590                assert!(recv1.as_mut().poll(cx).is_pending());
591                // second poll overwrites first waker
592                assert!(recv2.as_mut().poll(cx).is_pending());
593
594                // after sending a value, recv2 can resolve
595                chan.send(1).unwrap();
596                assert_eq!(recv2.as_mut().poll(cx), Poll::Ready(Some(1)));
597
598                Poll::Ready(())
599            })
600            .await;
601
602            chan.send(2).unwrap();
603            assert_eq!(chan.recv().await, Some(2))
604        });
605    }
606
607    #[test]
608    fn use_after_split() {
609        futures_lite::future::block_on(async {
610            let mut chan = super::channel::<i32>();
611            {
612                let (tx, mut rx) = chan.split();
613                tx.send(1).unwrap();
614                tx.send(2).unwrap();
615                assert_eq!(rx.recv().await, Some(1));
616                rx.close();
617            }
618
619            assert!(chan.is_closed());
620            assert_eq!(chan.recv().await, Some(2));
621            assert_eq!(chan.recv().await, None);
622        });
623    }
624
625    #[test]
626    fn split_after_close() {
627        let mut chan = super::channel::<i32>();
628        chan.close();
629
630        let (tx, rx) = chan.split();
631        assert!(tx.is_closed());
632        assert!(rx.is_closed());
633    }
634}