wnfs_unixfs_file/chunker/
fixed.rs

1use bytes::{Bytes, BytesMut};
2use std::io;
3use tokio::io::{AsyncRead, AsyncReadExt};
4use wnfs_common::utils::{boxed_stream, BoxStream, CondSend};
5
6/// Default size for chunks.
7pub const DEFAULT_CHUNKS_SIZE: usize = 1024 * 256;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct Fixed {
11    pub chunk_size: usize,
12}
13
14impl Default for Fixed {
15    fn default() -> Self {
16        Self {
17            chunk_size: DEFAULT_CHUNKS_SIZE,
18        }
19    }
20}
21
22impl Fixed {
23    pub fn new(chunk_size: usize) -> Self {
24        assert!(chunk_size > 0);
25
26        Self { chunk_size }
27    }
28
29    pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(
30        self,
31        mut source: R,
32    ) -> BoxStream<'a, io::Result<Bytes>> {
33        let chunk_size = self.chunk_size;
34        boxed_stream(async_stream::stream! {
35            let mut buffer = BytesMut::with_capacity(chunk_size);
36            let mut current_len = 0;
37
38            loop {
39                if current_len == 0 {
40                    buffer.clear();
41                }
42                match source.read_buf(&mut buffer).await {
43                    Ok(len) => {
44                        current_len += len;
45                        if current_len == chunk_size {
46                            // read a full chunk
47                            current_len = 0;
48                            yield Ok(buffer.clone().freeze());
49                        } else if current_len < chunk_size && len > 0 {
50                            // not done reading, read again
51                            continue;
52                        } else if current_len > chunk_size {
53                            // read more than a chunk, emit only a single chunk
54                            let out = buffer.split_to(chunk_size);
55                            current_len -= chunk_size;
56                            yield Ok(out.freeze());
57                        } else {
58                            // finished reading
59                            debug_assert!(len == 0);
60                            if current_len > 0 {
61                                yield Ok(buffer.clone().freeze());
62                            }
63                            break;
64                        }
65                    }
66                    Err(err) => {
67                        yield Err(err);
68                    }
69                }
70            }
71        })
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use futures::TryStreamExt;
79
80    #[tokio::test]
81    async fn test_fixed_chunker() {
82        // exact match
83        {
84            let mut content = Vec::with_capacity(1024);
85            content.resize(256, 1);
86            content.resize(512, 2);
87            content.resize(768, 3);
88            content.resize(1024, 4);
89            let bytes = std::io::Cursor::new(content);
90
91            let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
92            assert_eq!(chunks.len(), 4);
93            assert_eq!(&chunks[0], &[1u8; 256][..]);
94            assert_eq!(&chunks[1], &[2u8; 256][..]);
95            assert_eq!(&chunks[2], &[3u8; 256][..]);
96            assert_eq!(&chunks[3], &[4u8; 256][..]);
97        }
98
99        // overflow
100        {
101            let mut content = Vec::with_capacity(1024);
102            content.resize(256, 1);
103            content.resize(512, 2);
104            content.resize(768, 3);
105            content.resize(1024, 4);
106            content.push(5);
107            content.push(5);
108
109            let bytes = std::io::Cursor::new(content);
110            let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
111            assert_eq!(chunks.len(), 5);
112            assert_eq!(&chunks[0], &[1u8; 256][..]);
113            assert_eq!(&chunks[1], &[2u8; 256][..]);
114            assert_eq!(&chunks[2], &[3u8; 256][..]);
115            assert_eq!(&chunks[3], &[4u8; 256][..]);
116            assert_eq!(&chunks[4], &[5u8; 2][..]);
117        }
118    }
119}