async_fifo/channel/
async_read.rs

1use core::task::{Context, Poll};
2use core::pin::Pin;
3
4use futures_io::{Result as IoResult, Error, AsyncRead};
5use futures_io::ErrorKind::BrokenPipe;
6
7use crate::fifo::InternalStorageApi;
8
9use super::{Receiver, set_waker_check_no_prod};
10
11struct UpToLen<'a> {
12    inner: &'a mut [u8],
13}
14
15impl InternalStorageApi<u8> for UpToLen<'_> {
16    fn insert(&mut self, index: usize, item: u8) {
17        self.inner[index] = item;
18    }
19
20    fn bounds(&self) -> (Option<usize>, Option<usize>) {
21        (Some(1), Some(self.inner.len()))
22    }
23
24    fn reserve(&mut self, _len: usize) {}
25}
26
27impl<'a> AsyncRead for Receiver<u8> {
28    fn poll_read(
29        mut self: Pin<&mut Self>,
30        cx: &mut Context<'_>,
31        buf: &mut [u8],
32    ) -> Poll<IoResult<usize>> {
33        let mut storage = UpToLen {
34            inner: buf,
35        };
36
37        // first try
38        let len = self.fifo().pull(&mut storage);
39        if len != 0 {
40            return Poll::Ready(Ok(len));
41        }
42
43        // subscribe
44        if set_waker_check_no_prod(cx, &mut self) {
45            return Poll::Ready(Err(Error::new(BrokenPipe, "Closed")));
46        }
47
48        // second try
49        match self.fifo().pull(&mut storage) {
50            0 => Poll::Pending,
51            len => Poll::Ready(Ok(len)),
52        }
53    }
54}