futures_stream_reader/
lib.rs1use core::pin::Pin;
4use std::io::Error as IoError;
5
6use futures_util::{stream::unfold, AsyncRead, AsyncReadExt as _, Stream};
7
8const DEFAULT_CAPACITY: usize = 4096;
10
11pub fn reader<R: AsyncRead + Unpin + Send + 'static>(
13 reader: R,
14) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
15 reader_with_capacity(reader, DEFAULT_CAPACITY)
16}
17
18pub fn reader_with_capacity<R: AsyncRead + Unpin + Send + 'static>(
19 reader: R,
20 capacity: usize,
21) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'static>> {
22 let buf = vec![0; capacity];
23 let st = unfold((reader, buf), |(mut reader, mut buf)| async {
24 match reader.read(&mut buf).await {
25 Ok(n) if n == 0 => None,
26 Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
27 Err(err) => Some((Err(err), (reader, buf))),
28 }
29 });
30 Box::pin(st)
31}
32
33pub fn reader_ref<'a, R: AsyncRead + Unpin + Send>(
34 reader: &'a mut R,
35) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'a>> {
36 reader_ref_with_capacity(reader, DEFAULT_CAPACITY)
37}
38
39pub fn reader_ref_with_capacity<'a, R: AsyncRead + Unpin + Send>(
40 reader: &'a mut R,
41 capacity: usize,
42) -> Pin<Box<dyn Stream<Item = Result<Vec<u8>, IoError>> + Send + 'a>> {
43 let buf = vec![0; capacity];
44 let st = unfold((reader, buf), |(reader, mut buf)| async {
45 match reader.read(&mut buf).await {
46 Ok(n) if n == 0 => None,
47 Ok(n) => Some((Ok(buf[..n].to_vec()), (reader, buf))),
48 Err(err) => Some((Err(err), (reader, buf))),
49 }
50 });
51 Box::pin(st)
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57
58 use futures_util::{io::Cursor, StreamExt as _};
59
60 #[test]
61 fn simple() {
62 futures_executor::block_on(async {
63 {
64 let r = Cursor::new(b"1234567890");
65
66 let mut st = reader_with_capacity(r, 3);
67 let mut n = 0;
68 while let Some(ret) = st.next().await {
69 match ret {
70 Ok(bytes) => {
71 n += 1;
72 match n {
73 1 => assert_eq!(bytes, b"123"),
74 2 => assert_eq!(bytes, b"456"),
75 3 => assert_eq!(bytes, b"789"),
76 4 => assert_eq!(bytes, b"0"),
77 _ => unreachable!(),
78 }
79 }
80 Err(err) => panic!("{err:?}"),
81 }
82 }
83 assert_eq!(n, 4);
84 }
85
86 {
87 let mut r = Cursor::new(b"1234567890");
88
89 let mut st = reader_ref_with_capacity(&mut r, 3);
90 let mut n = 0;
91 while let Some(ret) = st.next().await {
92 match ret {
93 Ok(bytes) => {
94 n += 1;
95 match n {
96 1 => assert_eq!(bytes, b"123"),
97 2 => assert_eq!(bytes, b"456"),
98 3 => assert_eq!(bytes, b"789"),
99 4 => assert_eq!(bytes, b"0"),
100 _ => unreachable!(),
101 }
102 }
103 Err(err) => panic!("{err:?}"),
104 }
105 }
106 assert_eq!(n, 4);
107 }
108 })
109 }
110}