async_fifo/channel/
async_read.rs1use core::task::{Context, Poll};
2use core::pin::Pin;
3
4use futures_io::{Result as IoResult, Error, AsyncRead};
5use futures_io::ErrorKind::BrokenPipe;
6
7use crate::fifo::InternalStorageApi;
8
9use super::{Receiver, set_waker_check_no_prod};
10
11struct UpToLen<'a> {
12 inner: &'a mut [u8],
13}
14
15impl InternalStorageApi<u8> for UpToLen<'_> {
16 fn insert(&mut self, index: usize, item: u8) {
17 self.inner[index] = item;
18 }
19
20 fn bounds(&self) -> (Option<usize>, Option<usize>) {
21 (Some(1), Some(self.inner.len()))
22 }
23
24 fn reserve(&mut self, _len: usize) {}
25}
26
27impl<'a> AsyncRead for Receiver<u8> {
28 fn poll_read(
29 mut self: Pin<&mut Self>,
30 cx: &mut Context<'_>,
31 buf: &mut [u8],
32 ) -> Poll<IoResult<usize>> {
33 let mut storage = UpToLen {
34 inner: buf,
35 };
36
37 let len = self.fifo().pull(&mut storage);
39 if len != 0 {
40 return Poll::Ready(Ok(len));
41 }
42
43 if set_waker_check_no_prod(cx, &mut self) {
45 return Poll::Ready(Err(Error::new(BrokenPipe, "Closed")));
46 }
47
48 match self.fifo().pull(&mut storage) {
50 0 => Poll::Pending,
51 len => Poll::Ready(Ok(len)),
52 }
53 }
54}