async_hal/io/
mod.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5use futures::{ready, Sink, SinkExt, Stream, StreamExt};
6
7mod async_buf_read;
8pub use async_buf_read::AsyncBufRead;
9
10mod async_read;
11pub use async_read::AsyncRead;
12
13mod async_write;
14pub use async_write::AsyncWrite;
15
16mod buf_reader;
17pub use buf_reader::BufReader;
18
19mod copy_buf;
20pub use copy_buf::copy_buf;
21
22mod read;
23pub use read::Read;
24
25mod write_all;
26pub use write_all::WriteAll;
27
28/// Reader for a stream of bytes
29pub const fn reader<T, E>(stream: T) -> Reader<T>
30where
31    T: Stream<Item = Result<u8, E>> + Unpin,
32{
33    Reader::new(stream)
34}
35
36pub struct Reader<T> {
37    stream: T,
38    idx: usize,
39}
40
41impl<T> Reader<T> {
42    pub const fn new(stream: T) -> Self {
43        Self { stream, idx: 0 }
44    }
45}
46
47impl<T, E> AsyncRead for Reader<T>
48where
49    T: Stream<Item = Result<u8, E>> + Unpin,
50{
51    type Error = E;
52
53    fn poll_read(
54        mut self: Pin<&mut Self>,
55        cx: &mut Context,
56        buf: &mut [u8],
57    ) -> Poll<Result<usize, Self::Error>> {
58        while self.idx < buf.len() {
59            let byte = ready!(self.stream.poll_next_unpin(cx)).unwrap()?;
60            buf[self.idx] = byte;
61            self.idx += 1;
62        }
63
64        let used = self.idx;
65        self.idx = 0;
66        Poll::Ready(Ok(used))
67    }
68}
69
70/// Writer for a sink of bytes
71pub const fn writer<T>(stream: T) -> Writer<T>
72where
73    T: Sink<u8> + Unpin,
74{
75    Writer::new(stream)
76}
77
78pub struct Writer<T> {
79    sink: T,
80}
81
82impl<T> Writer<T> {
83    pub const fn new(sink: T) -> Self {
84        Self { sink }
85    }
86}
87
88impl<T> AsyncWrite for Writer<T>
89where
90    T: Sink<u8> + Unpin,
91{
92    type Error = T::Error;
93
94    fn poll_write(
95        mut self: Pin<&mut Self>,
96        cx: &mut Context,
97        buf: &[u8],
98    ) -> Poll<Result<usize, Self::Error>> {
99        let mut idx = 0;
100        while idx < buf.len() {
101            ready!(self.sink.poll_ready_unpin(cx))?;
102            self.sink.start_send_unpin(buf[0])?;
103            idx += 1;
104        }
105
106        Poll::Ready(Ok(idx))
107    }
108
109    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
110        self.sink.poll_flush_unpin(cx)
111    }
112}