1use core::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5use futures::{ready, Sink, SinkExt, Stream, StreamExt};
6
7mod async_buf_read;
8pub use async_buf_read::AsyncBufRead;
9
10mod async_read;
11pub use async_read::AsyncRead;
12
13mod async_write;
14pub use async_write::AsyncWrite;
15
16mod buf_reader;
17pub use buf_reader::BufReader;
18
19mod copy_buf;
20pub use copy_buf::copy_buf;
21
22mod read;
23pub use read::Read;
24
25mod write_all;
26pub use write_all::WriteAll;
27
28pub const fn reader<T, E>(stream: T) -> Reader<T>
30where
31 T: Stream<Item = Result<u8, E>> + Unpin,
32{
33 Reader::new(stream)
34}
35
36pub struct Reader<T> {
37 stream: T,
38 idx: usize,
39}
40
41impl<T> Reader<T> {
42 pub const fn new(stream: T) -> Self {
43 Self { stream, idx: 0 }
44 }
45}
46
47impl<T, E> AsyncRead for Reader<T>
48where
49 T: Stream<Item = Result<u8, E>> + Unpin,
50{
51 type Error = E;
52
53 fn poll_read(
54 mut self: Pin<&mut Self>,
55 cx: &mut Context,
56 buf: &mut [u8],
57 ) -> Poll<Result<usize, Self::Error>> {
58 while self.idx < buf.len() {
59 let byte = ready!(self.stream.poll_next_unpin(cx)).unwrap()?;
60 buf[self.idx] = byte;
61 self.idx += 1;
62 }
63
64 let used = self.idx;
65 self.idx = 0;
66 Poll::Ready(Ok(used))
67 }
68}
69
70pub const fn writer<T>(stream: T) -> Writer<T>
72where
73 T: Sink<u8> + Unpin,
74{
75 Writer::new(stream)
76}
77
78pub struct Writer<T> {
79 sink: T,
80}
81
82impl<T> Writer<T> {
83 pub const fn new(sink: T) -> Self {
84 Self { sink }
85 }
86}
87
88impl<T> AsyncWrite for Writer<T>
89where
90 T: Sink<u8> + Unpin,
91{
92 type Error = T::Error;
93
94 fn poll_write(
95 mut self: Pin<&mut Self>,
96 cx: &mut Context,
97 buf: &[u8],
98 ) -> Poll<Result<usize, Self::Error>> {
99 let mut idx = 0;
100 while idx < buf.len() {
101 ready!(self.sink.poll_ready_unpin(cx))?;
102 self.sink.start_send_unpin(buf[0])?;
103 idx += 1;
104 }
105
106 Poll::Ready(Ok(idx))
107 }
108
109 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
110 self.sink.poll_flush_unpin(cx)
111 }
112}