async_h1/
read_notifier.rs1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use async_channel::Sender;
6use futures_lite::io::{self, AsyncBufRead as BufRead, AsyncRead as Read};
7
8#[pin_project::pin_project]
14pub(crate) struct ReadNotifier<B> {
15 #[pin]
16 reader: B,
17 sender: Sender<()>,
18 has_been_read: bool,
19}
20
21impl<B> fmt::Debug for ReadNotifier<B> {
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 f.debug_struct("ReadNotifier")
24 .field("read", &self.has_been_read)
25 .finish()
26 }
27}
28
29impl<B: Read> ReadNotifier<B> {
30 pub(crate) fn new(reader: B, sender: Sender<()>) -> Self {
31 Self {
32 reader,
33 sender,
34 has_been_read: false,
35 }
36 }
37}
38
39impl<B: BufRead> BufRead for ReadNotifier<B> {
40 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
41 self.project().reader.poll_fill_buf(cx)
42 }
43
44 fn consume(self: Pin<&mut Self>, amt: usize) {
45 self.project().reader.consume(amt)
46 }
47}
48
49impl<B: Read> Read for ReadNotifier<B> {
50 fn poll_read(
51 self: Pin<&mut Self>,
52 cx: &mut Context<'_>,
53 buf: &mut [u8],
54 ) -> Poll<io::Result<usize>> {
55 let this = self.project();
56
57 if !*this.has_been_read {
58 if let Ok(()) = this.sender.try_send(()) {
59 *this.has_been_read = true;
60 };
61 }
62
63 this.reader.poll_read(cx, buf)
64 }
65}