async_h1/
read_notifier.rs

1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use async_channel::Sender;
6use futures_lite::io::{self, AsyncBufRead as BufRead, AsyncRead as Read};
7
8/// ReadNotifier forwards [`async_std::io::Read`] and
9/// [`async_std::io::BufRead`] to an inner reader. When the
10/// ReadNotifier is read from (using `Read`, `ReadExt`, or `BufRead`
11/// methods), it sends a single message containing `()` on the
12/// channel.
13#[pin_project::pin_project]
14pub(crate) struct ReadNotifier<B> {
15    #[pin]
16    reader: B,
17    sender: Sender<()>,
18    has_been_read: bool,
19}
20
21impl<B> fmt::Debug for ReadNotifier<B> {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        f.debug_struct("ReadNotifier")
24            .field("read", &self.has_been_read)
25            .finish()
26    }
27}
28
29impl<B: Read> ReadNotifier<B> {
30    pub(crate) fn new(reader: B, sender: Sender<()>) -> Self {
31        Self {
32            reader,
33            sender,
34            has_been_read: false,
35        }
36    }
37}
38
39impl<B: BufRead> BufRead for ReadNotifier<B> {
40    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
41        self.project().reader.poll_fill_buf(cx)
42    }
43
44    fn consume(self: Pin<&mut Self>, amt: usize) {
45        self.project().reader.consume(amt)
46    }
47}
48
49impl<B: Read> Read for ReadNotifier<B> {
50    fn poll_read(
51        self: Pin<&mut Self>,
52        cx: &mut Context<'_>,
53        buf: &mut [u8],
54    ) -> Poll<io::Result<usize>> {
55        let this = self.project();
56
57        if !*this.has_been_read {
58            if let Ok(()) = this.sender.try_send(()) {
59                *this.has_been_read = true;
60            };
61        }
62
63        this.reader.poll_read(cx, buf)
64    }
65}