futures_stream_reader/
lib.rs

1// Ref https://github.com/tokio-rs/tokio/blob/tokio-util-0.7.7/tokio-util/src/io/reader_stream.rs
2
3use core::pin::Pin;
4use std::io::Error as IoError;
5
6use futures_util::{stream::unfold, AsyncRead, AsyncReadExt as _, Stream};
7
8//
9const DEFAULT_CAPACITY: usize = 4096;
10
11//
12pub fn reader<R: AsyncRead + Unpin + Send + 'static>(
13    reader: R,
14) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
15    reader_with_capacity(reader, DEFAULT_CAPACITY)
16}
17
18pub fn reader_with_capacity<R: AsyncRead + Unpin + Send + 'static>(
19    reader: R,
20    capacity: usize,
21) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
22    let buf = vec![0; capacity];
23    let st = unfold((reader, buf), |(mut reader, mut buf)| async {
24        match reader.read(&mut buf).await {
25            Ok(n) if n == 0 => None,
26            Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
27            Err(err) => Some((Err(err), (reader, buf))),
28        }
29    });
30    Box::pin(st)
31}
32
33pub fn reader_ref<'a, R: AsyncRead + Unpin + Send>(
34    reader: &'a mut R,
35) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'a>> {
36    reader_ref_with_capacity(reader, DEFAULT_CAPACITY)
37}
38
39pub fn reader_ref_with_capacity<'a, R: AsyncRead + Unpin + Send>(
40    reader: &'a mut R,
41    capacity: usize,
42) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'a>> {
43    let buf = vec![0; capacity];
44    let st = unfold((reader, buf), |(reader, mut buf)| async {
45        match reader.read(&mut buf).await {
46            Ok(n) if n == 0 => None,
47            Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
48            Err(err) => Some((Err(err), (reader, buf))),
49        }
50    });
51    Box::pin(st)
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57
58    use futures_util::{io::Cursor, StreamExt as _};
59
60    #[test]
61    fn simple() {
62        futures_executor::block_on(async {
63            {
64                let r = Cursor::new(b"1234567890");
65
66                let mut st = reader_with_capacity(r, 3);
67                let mut n = 0;
68                while let Some(ret) = st.next().await {
69                    match ret {
70                        Ok(bytes) => {
71                            n += 1;
72                            match n {
73                                1 => assert_eq!(bytes, b"123"),
74                                2 => assert_eq!(bytes, b"456"),
75                                3 => assert_eq!(bytes, b"789"),
76                                4 => assert_eq!(bytes, b"0"),
77                                _ => unreachable!(),
78                            }
79                        }
80                        Err(err) => panic!("{err:?}"),
81                    }
82                }
83                assert_eq!(n, 4);
84            }
85
86            {
87                let mut r = Cursor::new(b"1234567890");
88
89                let mut st = reader_ref_with_capacity(&mut r, 3);
90                let mut n = 0;
91                while let Some(ret) = st.next().await {
92                    match ret {
93                        Ok(bytes) => {
94                            n += 1;
95                            match n {
96                                1 => assert_eq!(bytes, b"123"),
97                                2 => assert_eq!(bytes, b"456"),
98                                3 => assert_eq!(bytes, b"789"),
99                                4 => assert_eq!(bytes, b"0"),
100                                _ => unreachable!(),
101                            }
102                        }
103                        Err(err) => panic!("{err:?}"),
104                    }
105                }
106                assert_eq!(n, 4);
107            }
108        })
109    }
110}