async_fifo/channel/
future.rs

1use core::task::{Context, Poll};
2use core::future::Future;
3use core::pin::Pin;
4
5use alloc::vec::Vec;
6
7use crate::fifo::{TmpArray, Storage};
8use super::async_api::Receiver;
9
10/// An error type returned when the channel is closed
11///
12/// A channel is considered closed if it has no items
13/// and no senders.
14#[derive(Copy, Clone, Debug, PartialEq, Eq)]
15pub struct Closed;
16
17/// Asynchronous variant of [`Storage`] (Borrowing)
18pub trait FillStorage<T>: Storage<T> + Unpin {}
19
20/// Asynchronous variant of [`Storage`] (Defaulting)
21pub trait RecvStorage<T>: FillStorage<T> + Default {}
22
23/// Future for Fifo consumption
24///
25/// Unlike [`Fill`], this future will construct the backing
26/// storage using its implementation of `Default`.
27///
28/// This future is safely cancellable.
29pub struct Recv<'a, S: RecvStorage<T>, T> {
30    receiver: &'a mut Receiver<T>,
31    storage: Option<S>,
32}
33
34/// Future for Fifo consumption
35///
36/// Unlike [`Recv`], this future will use a mutably borrowed
37/// storage object that you must provide.
38///
39/// This future is safely cancellable.
40pub struct Fill<'a, S: FillStorage<T>, T> {
41    receiver: &'a mut Receiver<T>,
42    storage: S,
43}
44
45/// Future for Fifo consumption of one item
46pub type RecvOne<'a, T> = Recv<'a, Option<T>, T>;
47
48/// Future for Fifo consumption of `N` items
49pub type RecvArray<'a, const N: usize, T> = Recv<'a, TmpArray<N, T>, T>;
50
51/// Future for Fifo consumption of `N` items
52pub type FillExact<'a, T> = Fill<'a, &'a mut [T], T>;
53
54/// Future for Fifo consumption of `N` items
55pub type FillMany<'a, T> = Fill<'a, &'a mut Vec<T>, T>;
56
57impl<T: Unpin> Receiver<T> {
58    /// Receives some items into custom storage, asynchronously.
59    pub fn into_recv<S: RecvStorage<T>>(&mut self) -> Recv<'_, S, T> {
60        Recv {
61            receiver: self,
62            storage: None,
63        }
64    }
65
66    /// Receives some items into custom storage, asynchronously.
67    pub fn into_fill<S: FillStorage<T>>(&mut self, storage: S) -> Fill<'_, S, T> {
68        Fill {
69            receiver: self,
70            storage,
71        }
72    }
73}
74
75pub(super) fn set_waker_check_no_prod<T: Unpin>(
76    cx: &mut Context<'_>,
77    receiver: &mut Receiver<T>,
78) -> bool {
79    if receiver.no_senders() {
80        return true;
81    }
82
83    // set the waker up
84    // prevent re-pushing the waker by watching a wake count
85    let waker = cx.waker().clone();
86    let last_wc = receiver.last_wake_count.take();
87    let subs = receiver.subscribers();
88    let current_wc = subs.subscribe(waker, last_wc);
89    receiver.last_wake_count = Some(current_wc);
90
91    // maybe the channel got closed while we were inserting our waker
92    receiver.no_senders()
93}
94
95impl<'a, S: RecvStorage<T>, T: Unpin> Future for Recv<'a, S, T> {
96    type Output = Result<S::Output, Closed>;
97
98    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99        let Self {
100            receiver,
101            storage,
102        } = &mut *self;
103
104        let taken_storage = storage.take().unwrap_or_default();
105
106        // first try
107        let taken_storage = match receiver.try_recv_into(taken_storage) {
108            Ok(result) => return Poll::Ready(Ok(result)),
109            Err(taken_storage) => taken_storage,
110        };
111
112        // subscribe
113        let mut fail_ret = Poll::Pending;
114        if set_waker_check_no_prod(cx, receiver) {
115            fail_ret = Poll::Ready(Err(Closed));
116        }
117
118        // second try
119        match receiver.try_recv_into(taken_storage) {
120            Ok(result) => Poll::Ready(Ok(result)),
121            Err(taken_storage) => {
122                *storage = Some(taken_storage);
123                fail_ret
124            },
125        }
126    }
127}
128
129impl<'a, T: Unpin, S: FillStorage<T>> Future for Fill<'a, S, T> {
130    type Output = Result<usize, Closed>;
131
132    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        let Self {
134            receiver,
135            storage,
136        } = &mut *self;
137
138        // first try
139        if let len @ 1.. = receiver.fifo().pull(storage) {
140            return Poll::Ready(Ok(len));
141        }
142
143        // subscribe
144        let mut fail_ret = Poll::Pending;
145        if set_waker_check_no_prod(cx, receiver) {
146            fail_ret = Poll::Ready(Err(Closed));
147        }
148
149        // second try
150        match receiver.fifo().pull(storage) {
151            0 => fail_ret,
152            len => Poll::Ready(Ok(len)),
153        }
154    }
155}
156
157impl<T: Unpin, S: Storage<T> + Unpin> FillStorage<T> for S {}
158impl<T: Unpin, S: FillStorage<T> + Default> RecvStorage<T> for S {}