1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use core::pin::Pin;
use std::io::Error as IoError;
use futures_util::{stream::unfold, AsyncRead, AsyncReadExt as _, Stream};
const DEFAULT_CAPACITY: usize = 4096;
pub fn reader<R: AsyncRead + Send + 'static + Unpin>(
reader: R,
) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
reader_capacity(reader, DEFAULT_CAPACITY)
}
pub fn reader_capacity<R: AsyncRead + Send + 'static + Unpin>(
reader: R,
capacity: usize,
) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
let buf = vec![0; capacity];
let st = unfold((reader, buf), |(mut reader, mut buf)| async {
match reader.read(&mut buf).await {
Ok(n) if n == 0 => None,
Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
Err(err) => Some((Err(err), (reader, buf))),
}
});
Box::pin(st)
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::{io::Cursor, StreamExt as _};
#[test]
fn test() {
futures_executor::block_on(async {
let r = Cursor::new(b"1234567890");
let mut st = reader_capacity(r, 3);
let mut n = 0;
while let Some(ret) = st.next().await {
match ret {
Ok(bytes) => {
n += 1;
match n {
1 => assert_eq!(bytes, b"123"),
2 => assert_eq!(bytes, b"456"),
3 => assert_eq!(bytes, b"789"),
4 => assert_eq!(bytes, b"0"),
_ => unreachable!(),
}
}
Err(err) => panic!("{err:?}"),
}
}
assert_eq!(n, 4);
})
}
}