1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use bytes::{Bytes, BytesMut};
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt};
use wnfs_common::utils::{boxed_stream, BoxStream, CondSend};

/// Default size for chunks.
pub const DEFAULT_CHUNKS_SIZE: usize = 1024 * 256;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Fixed {
    pub chunk_size: usize,
}

impl Default for Fixed {
    fn default() -> Self {
        Self {
            chunk_size: DEFAULT_CHUNKS_SIZE,
        }
    }
}

impl Fixed {
    pub fn new(chunk_size: usize) -> Self {
        assert!(chunk_size > 0);

        Self { chunk_size }
    }

    pub fn chunks<'a, R: AsyncRead + Unpin + CondSend + 'a>(
        self,
        mut source: R,
    ) -> BoxStream<'a, io::Result<Bytes>> {
        let chunk_size = self.chunk_size;
        boxed_stream(async_stream::stream! {
            let mut buffer = BytesMut::with_capacity(chunk_size);
            let mut current_len = 0;

            loop {
                if current_len == 0 {
                    buffer.clear();
                }
                match source.read_buf(&mut buffer).await {
                    Ok(len) => {
                        current_len += len;
                        if current_len == chunk_size {
                            // read a full chunk
                            current_len = 0;
                            yield Ok(buffer.clone().freeze());
                        } else if current_len < chunk_size && len > 0 {
                            // not done reading, read again
                            continue;
                        } else if current_len > chunk_size {
                            // read more than a chunk, emit only a single chunk
                            let out = buffer.split_to(chunk_size);
                            current_len -= chunk_size;
                            yield Ok(out.freeze());
                        } else {
                            // finished reading
                            debug_assert!(len == 0);
                            if current_len > 0 {
                                yield Ok(buffer.clone().freeze());
                            }
                            break;
                        }
                    }
                    Err(err) => {
                        yield Err(err);
                    }
                }
            }
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::TryStreamExt;

    #[tokio::test]
    async fn test_fixed_chunker() {
        // exact match
        {
            let mut content = Vec::with_capacity(1024);
            content.resize(256, 1);
            content.resize(512, 2);
            content.resize(768, 3);
            content.resize(1024, 4);
            let bytes = std::io::Cursor::new(content);

            let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
            assert_eq!(chunks.len(), 4);
            assert_eq!(&chunks[0], &[1u8; 256][..]);
            assert_eq!(&chunks[1], &[2u8; 256][..]);
            assert_eq!(&chunks[2], &[3u8; 256][..]);
            assert_eq!(&chunks[3], &[4u8; 256][..]);
        }

        // overflow
        {
            let mut content = Vec::with_capacity(1024);
            content.resize(256, 1);
            content.resize(512, 2);
            content.resize(768, 3);
            content.resize(1024, 4);
            content.push(5);
            content.push(5);

            let bytes = std::io::Cursor::new(content);
            let chunks: Vec<_> = Fixed::new(256).chunks(bytes).try_collect().await.unwrap();
            assert_eq!(chunks.len(), 5);
            assert_eq!(&chunks[0], &[1u8; 256][..]);
            assert_eq!(&chunks[1], &[2u8; 256][..]);
            assert_eq!(&chunks[2], &[3u8; 256][..]);
            assert_eq!(&chunks[3], &[4u8; 256][..]);
            assert_eq!(&chunks[4], &[5u8; 2][..]);
        }
    }
}