1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Ref https://github.com/tokio-rs/tokio/blob/tokio-util-0.7.7/tokio-util/src/io/reader_stream.rs

use core::pin::Pin;
use std::io::Error as IoError;

use futures_util::{stream::unfold, AsyncRead, AsyncReadExt as _, Stream};

//
const DEFAULT_CAPACITY: usize = 4096;

//
pub fn reader<R: AsyncRead + Send + 'static + Unpin>(
    reader: R,
) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
    reader_capacity(reader, DEFAULT_CAPACITY)
}

pub fn reader_capacity<R: AsyncRead + Send + 'static + Unpin>(
    reader: R,
    capacity: usize,
) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
    let buf = vec![0; capacity];
    let st = unfold((reader, buf), |(mut reader, mut buf)| async {
        match reader.read(&mut buf).await {
            Ok(n) if n == 0 => None,
            Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
            Err(err) => Some((Err(err), (reader, buf))),
        }
    });
    Box::pin(st)
}

#[cfg(test)]
mod tests {
    use super::*;

    use futures_util::{io::Cursor, StreamExt as _};

    #[test]
    fn test() {
        futures_executor::block_on(async {
            let r = Cursor::new(b"1234567890");

            let mut st = reader_capacity(r, 3);

            let mut n = 0;
            while let Some(ret) = st.next().await {
                match ret {
                    Ok(bytes) => {
                        n += 1;
                        match n {
                            1 => assert_eq!(bytes, b"123"),
                            2 => assert_eq!(bytes, b"456"),
                            3 => assert_eq!(bytes, b"789"),
                            4 => assert_eq!(bytes, b"0"),
                            _ => unreachable!(),
                        }
                    }
                    Err(err) => panic!("{err:?}"),
                }
            }
            assert_eq!(n, 4);
        })
    }
}