wnfs_unixfs_file/chunker/
fixed.rs1use bytes::{Bytes, BytesMut};
2use std::io;
3use tokio::io::{AsyncRead, AsyncReadExt};
4use wnfs_common::utils::{boxed_stream, BoxStream, CondSend};
5
6pub const DEFAULT_CHUNKS_SIZE: usize = 1024 * 256;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct Fixed {
11 pub chunk_size: usize,
12}
13
14impl Default for Fixed {
15 fn default() -> Self {
16 Self {
17 chunk_size: DEFAULT_CHUNKS_SIZE,
18 }
19 }
20}
21
22impl Fixed {
23 pub fn new(chunk_size: usize) -> Self {
24 assert!(chunk_size > 0);
25
26 Self { chunk_size }
27 }
28
29 pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(
30 self,
31 mut source: R,
32 ) -> BoxStream<'a, io::Result<Bytes>> {
33 let chunk_size = self.chunk_size;
34 boxed_stream(async_stream::stream! {
35 let mut buffer = BytesMut::with_capacity(chunk_size);
36 let mut current_len = 0;
37
38 loop {
39 if current_len == 0 {
40 buffer.clear();
41 }
42 match source.read_buf(&mut buffer).await {
43 Ok(len) => {
44 current_len += len;
45 if current_len == chunk_size {
46 current_len = 0;
48 yield Ok(buffer.clone().freeze());
49 } else if current_len < chunk_size && len > 0 {
50 continue;
52 } else if current_len > chunk_size {
53 let out = buffer.split_to(chunk_size);
55 current_len -= chunk_size;
56 yield Ok(out.freeze());
57 } else {
58 debug_assert!(len == 0);
60 if current_len > 0 {
61 yield Ok(buffer.clone().freeze());
62 }
63 break;
64 }
65 }
66 Err(err) => {
67 yield Err(err);
68 }
69 }
70 }
71 })
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use futures::TryStreamExt;
79
80 #[tokio::test]
81 async fn test_fixed_chunker() {
82 {
84 let mut content = Vec::with_capacity(1024);
85 content.resize(256, 1);
86 content.resize(512, 2);
87 content.resize(768, 3);
88 content.resize(1024, 4);
89 let bytes = std::io::Cursor::new(content);
90
91 let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
92 assert_eq!(chunks.len(), 4);
93 assert_eq!(&chunks[0], &[1u8; 256][..]);
94 assert_eq!(&chunks[1], &[2u8; 256][..]);
95 assert_eq!(&chunks[2], &[3u8; 256][..]);
96 assert_eq!(&chunks[3], &[4u8; 256][..]);
97 }
98
99 {
101 let mut content = Vec::with_capacity(1024);
102 content.resize(256, 1);
103 content.resize(512, 2);
104 content.resize(768, 3);
105 content.resize(1024, 4);
106 content.push(5);
107 content.push(5);
108
109 let bytes = std::io::Cursor::new(content);
110 let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
111 assert_eq!(chunks.len(), 5);
112 assert_eq!(&chunks[0], &[1u8; 256][..]);
113 assert_eq!(&chunks[1], &[2u8; 256][..]);
114 assert_eq!(&chunks[2], &[3u8; 256][..]);
115 assert_eq!(&chunks[3], &[4u8; 256][..]);
116 assert_eq!(&chunks[4], &[5u8; 2][..]);
117 }
118 }
119}