async_ringbuf/traits/
consumer.rs1use core::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll, Waker},
5};
6use futures_util::future::FusedFuture;
7use ringbuf::traits::Consumer;
8#[cfg(feature = "std")]
9use std::io;
10
11pub trait AsyncConsumer: Consumer {
12 fn register_waker(&self, waker: &Waker);
13
14 fn close(&mut self);
15 fn is_closed(&self) -> bool {
17 !self.write_is_held()
18 }
19
20 fn pop(&mut self) -> PopFuture<'_, Self> {
30 PopFuture { owner: self, done: false }
31 }
32
33 fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
43 debug_assert!(count <= self.capacity().get());
44 WaitOccupiedFuture {
45 owner: self,
46 count,
47 done: false,
48 }
49 }
50
51 fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
62 where
63 Self::Item: Copy,
64 {
65 PopSliceFuture {
66 owner: self,
67 slice: Some(slice),
68 count: 0,
69 }
70 }
71
72 #[cfg(feature = "alloc")]
78 fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
79 PopVecFuture {
80 owner: self,
81 vec: Some(vec),
82 }
83 }
84
85 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
87 where
88 Self: Unpin,
89 {
90 let mut waker_registered = false;
91 loop {
92 let closed = self.is_closed();
93 if let Some(item) = self.try_pop() {
94 break Poll::Ready(Some(item));
95 }
96 if closed {
97 break Poll::Ready(None);
98 }
99 if waker_registered {
100 break Poll::Pending;
101 }
102 self.register_waker(cx.waker());
103 waker_registered = true;
104 }
105 }
106
107 #[cfg(feature = "std")]
109 fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
110 where
111 Self: AsyncConsumer<Item = u8> + Unpin,
112 {
113 let mut waker_registered = false;
114 loop {
115 let closed = self.is_closed();
116 let len = self.pop_slice(buf);
117 if len != 0 || closed {
118 break Poll::Ready(Ok(len));
119 }
120 if waker_registered {
121 break Poll::Pending;
122 }
123 self.register_waker(cx.waker());
124 waker_registered = true;
125 }
126 }
127}
128
129pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
133 owner: &'a mut A,
134 done: bool,
135}
136impl<A: AsyncConsumer> Unpin for PopFuture<'_, A> {}
137impl<A: AsyncConsumer> FusedFuture for PopFuture<'_, A> {
138 fn is_terminated(&self) -> bool {
139 self.done || self.owner.is_closed()
140 }
141}
142impl<A: AsyncConsumer> Future for PopFuture<'_, A> {
143 type Output = Option<A::Item>;
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146 let mut waker_registered = false;
147 loop {
148 assert!(!self.done);
149 let closed = self.owner.is_closed();
150 if let Some(item) = self.owner.try_pop() {
151 self.done = true;
152 break Poll::Ready(Some(item));
153 }
154 if closed {
155 break Poll::Ready(None);
156 }
157 if waker_registered {
158 break Poll::Pending;
159 }
160 self.owner.register_waker(cx.waker());
161 waker_registered = true;
162 }
163 }
164}
165
166pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
170where
171 A::Item: Copy,
172{
173 owner: &'a mut A,
174 slice: Option<&'b mut [A::Item]>,
175 count: usize,
176}
177impl<A: AsyncConsumer> Unpin for PopSliceFuture<'_, '_, A> where A::Item: Copy {}
178impl<A: AsyncConsumer> FusedFuture for PopSliceFuture<'_, '_, A>
179where
180 A::Item: Copy,
181{
182 fn is_terminated(&self) -> bool {
183 self.slice.is_none()
184 }
185}
186impl<A: AsyncConsumer> Future for PopSliceFuture<'_, '_, A>
187where
188 A::Item: Copy,
189{
190 type Output = Result<(), usize>;
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 let mut waker_registered = false;
194 loop {
195 let closed = self.owner.is_closed();
196 let mut slice = self.slice.take().unwrap();
197 let len = self.owner.pop_slice(slice);
198 slice = &mut slice[len..];
199 self.count += len;
200 if slice.is_empty() {
201 break Poll::Ready(Ok(()));
202 }
203 if closed {
204 break Poll::Ready(Err(self.count));
205 }
206 self.slice.replace(slice);
207 if waker_registered {
208 break Poll::Pending;
209 }
210 self.owner.register_waker(cx.waker());
211 waker_registered = true;
212 }
213 }
214}
215impl<A: AsyncConsumer> PopSliceFuture<'_, '_, A>
216where
217 A::Item: Copy,
218{
219 pub fn count(&self) -> usize {
221 self.count
222 }
223}
224
225#[cfg(feature = "alloc")]
229pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
230 owner: &'a mut A,
231 vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
232}
233#[cfg(feature = "alloc")]
234impl<A: AsyncConsumer> Unpin for PopVecFuture<'_, '_, A> {}
235#[cfg(feature = "alloc")]
236impl<A: AsyncConsumer> FusedFuture for PopVecFuture<'_, '_, A> {
237 fn is_terminated(&self) -> bool {
238 self.vec.is_none()
239 }
240}
241#[cfg(feature = "alloc")]
242impl<A: AsyncConsumer> Future for PopVecFuture<'_, '_, A> {
243 type Output = ();
244
245 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246 let mut waker_registered = false;
247 loop {
248 let closed = self.owner.is_closed();
249 let vec = self.vec.take().unwrap();
250
251 loop {
252 if vec.len() == vec.capacity() {
253 vec.reserve(vec.capacity().max(16));
254 }
255 let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
256 if n == 0 {
257 break;
258 }
259 unsafe { vec.set_len(vec.len() + n) };
260 }
261
262 if closed {
263 break Poll::Ready(());
264 }
265 self.vec.replace(vec);
266 if waker_registered {
267 break Poll::Pending;
268 }
269 self.owner.register_waker(cx.waker());
270 waker_registered = true;
271 }
272 }
273}
274
275pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
279 owner: &'a A,
280 count: usize,
281 done: bool,
282}
283impl<A: AsyncConsumer> Unpin for WaitOccupiedFuture<'_, A> {}
284impl<A: AsyncConsumer> FusedFuture for WaitOccupiedFuture<'_, A> {
285 fn is_terminated(&self) -> bool {
286 self.done
287 }
288}
289impl<A: AsyncConsumer> Future for WaitOccupiedFuture<'_, A> {
290 type Output = ();
291
292 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
293 let mut waker_registered = false;
294 loop {
295 assert!(!self.done);
296 let closed = self.owner.is_closed();
297 if self.count <= self.owner.occupied_len() || closed {
298 self.done = true;
299 break Poll::Ready(());
300 }
301 if waker_registered {
302 break Poll::Pending;
303 }
304 self.owner.register_waker(cx.waker());
305 waker_registered = true;
306 }
307 }
308}