async_fifo/channel/
future.rs1use core::task::{Context, Poll};
2use core::future::Future;
3use core::pin::Pin;
4
5use alloc::vec::Vec;
6
7use crate::fifo::{TmpArray, Storage};
8use super::async_api::Receiver;
9
10#[derive(Copy, Clone, Debug, PartialEq, Eq)]
15pub struct Closed;
16
17pub trait FillStorage<T>: Storage<T> + Unpin {}
19
20pub trait RecvStorage<T>: FillStorage<T> + Default {}
22
23pub struct Recv<'a, S: RecvStorage<T>, T> {
30 receiver: &'a mut Receiver<T>,
31 storage: Option<S>,
32}
33
34pub struct Fill<'a, S: FillStorage<T>, T> {
41 receiver: &'a mut Receiver<T>,
42 storage: S,
43}
44
45pub type RecvOne<'a, T> = Recv<'a, Option<T>, T>;
47
48pub type RecvArray<'a, const N: usize, T> = Recv<'a, TmpArray<N, T>, T>;
50
51pub type FillExact<'a, T> = Fill<'a, &'a mut [T], T>;
53
54pub type FillMany<'a, T> = Fill<'a, &'a mut Vec<T>, T>;
56
57impl<T: Unpin> Receiver<T> {
58 pub fn into_recv<S: RecvStorage<T>>(&mut self) -> Recv<'_, S, T> {
60 Recv {
61 receiver: self,
62 storage: None,
63 }
64 }
65
66 pub fn into_fill<S: FillStorage<T>>(&mut self, storage: S) -> Fill<'_, S, T> {
68 Fill {
69 receiver: self,
70 storage,
71 }
72 }
73}
74
75pub(super) fn set_waker_check_no_prod<T: Unpin>(
76 cx: &mut Context<'_>,
77 receiver: &mut Receiver<T>,
78) -> bool {
79 if receiver.no_senders() {
80 return true;
81 }
82
83 let waker = cx.waker().clone();
86 let last_wc = receiver.last_wake_count.take();
87 let subs = receiver.subscribers();
88 let current_wc = subs.subscribe(waker, last_wc);
89 receiver.last_wake_count = Some(current_wc);
90
91 receiver.no_senders()
93}
94
95impl<'a, S: RecvStorage<T>, T: Unpin> Future for Recv<'a, S, T> {
96 type Output = Result<S::Output, Closed>;
97
98 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99 let Self {
100 receiver,
101 storage,
102 } = &mut *self;
103
104 let taken_storage = storage.take().unwrap_or_default();
105
106 let taken_storage = match receiver.try_recv_into(taken_storage) {
108 Ok(result) => return Poll::Ready(Ok(result)),
109 Err(taken_storage) => taken_storage,
110 };
111
112 let mut fail_ret = Poll::Pending;
114 if set_waker_check_no_prod(cx, receiver) {
115 fail_ret = Poll::Ready(Err(Closed));
116 }
117
118 match receiver.try_recv_into(taken_storage) {
120 Ok(result) => Poll::Ready(Ok(result)),
121 Err(taken_storage) => {
122 *storage = Some(taken_storage);
123 fail_ret
124 },
125 }
126 }
127}
128
129impl<'a, T: Unpin, S: FillStorage<T>> Future for Fill<'a, S, T> {
130 type Output = Result<usize, Closed>;
131
132 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 let Self {
134 receiver,
135 storage,
136 } = &mut *self;
137
138 if let len @ 1.. = receiver.fifo().pull(storage) {
140 return Poll::Ready(Ok(len));
141 }
142
143 let mut fail_ret = Poll::Pending;
145 if set_waker_check_no_prod(cx, receiver) {
146 fail_ret = Poll::Ready(Err(Closed));
147 }
148
149 match receiver.fifo().pull(storage) {
151 0 => fail_ret,
152 len => Poll::Ready(Ok(len)),
153 }
154 }
155}
156
157impl<T: Unpin, S: Storage<T> + Unpin> FillStorage<T> for S {}
158impl<T: Unpin, S: FillStorage<T> + Default> RecvStorage<T> for S {}