flowly_core/
chunked.rs

1use std::{
2    collections::{VecDeque, vec_deque},
3    io::IoSlice,
4};
5
6use bytes::{Buf, Bytes};
7
8#[derive(Debug, Default, Clone)]
9pub struct Chunked<T> {
10    remaining: usize,
11    chunks: VecDeque<T>,
12}
13
14impl<T> Chunked<T> {
15    pub fn new() -> Self {
16        Self {
17            remaining: 0,
18            chunks: Default::default(),
19        }
20    }
21
22    pub fn with_capacity(cap: usize) -> Self {
23        Self {
24            remaining: 0,
25            chunks: VecDeque::with_capacity(cap),
26        }
27    }
28
29    #[inline]
30    pub fn iter(&self) -> vec_deque::Iter<'_, T> {
31        self.chunks.iter()
32    }
33}
34
35impl<T: Buf> Chunked<T> {
36    #[inline]
37    pub fn put(&mut self, chunk: T) {
38        if !chunk.has_remaining() {
39            return;
40        }
41
42        self.remaining += chunk.remaining();
43        self.chunks.push_back(chunk);
44    }
45}
46
47impl<T: Buf> Buf for Chunked<T> {
48    #[inline]
49    fn remaining(&self) -> usize {
50        self.remaining
51    }
52
53    #[inline]
54    fn chunk(&self) -> &[u8] {
55        self.chunks.front().map(|x| x.chunk()).unwrap_or(&[])
56    }
57
58    fn advance(&mut self, mut cnt: usize) {
59        self.remaining -= cnt;
60
61        while cnt > 0 {
62            let Some(chunk) = self.chunks.front_mut() else {
63                panic!("advance: no available chunks!");
64            };
65
66            let len = chunk.remaining();
67            if cnt < len {
68                chunk.advance(cnt);
69                return;
70            } else {
71                cnt -= len;
72                self.chunks.pop_front();
73            }
74        }
75    }
76
77    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
78        let mut iter = self.chunks.iter();
79        let mut len = 0;
80
81        while len < dst.len() {
82            let Some(chunk) = iter.next() else {
83                break;
84            };
85
86            len += chunk.chunks_vectored(&mut dst[len..]);
87        }
88
89        len
90    }
91
92    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
93        if let Some(chunk) = self.chunks.front_mut() {
94            if chunk.remaining() > len {
95                self.remaining -= len;
96                chunk.copy_to_bytes(len)
97            } else if chunk.remaining() == len {
98                self.remaining -= len;
99                self.chunks.pop_front().unwrap().copy_to_bytes(len)
100            } else {
101                use bytes::buf::BufMut;
102                let mut ret = bytes::BytesMut::with_capacity(len);
103                ret.put(self.take(len));
104                ret.freeze()
105            }
106        } else {
107            panic!("copy_to_bytes: no available chunks!");
108        }
109    }
110}
111
112impl<T: Buf> From<Vec<T>> for Chunked<T> {
113    fn from(value: Vec<T>) -> Self {
114        Self {
115            remaining: value.iter().map(|x| x.remaining()).sum(),
116            chunks: value.into(),
117        }
118    }
119}
120
121impl<T> IntoIterator for Chunked<T> {
122    type Item = T;
123    type IntoIter = vec_deque::IntoIter<T>;
124
125    #[inline]
126    fn into_iter(self) -> Self::IntoIter {
127        self.chunks.into_iter()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::io::{Cursor, IoSlice, Write};
134
135    use bytes::{Buf, Bytes};
136
137    use super::Chunked;
138
139    #[test]
140    fn test_chunked_bytes_copy_to_slice() {
141        let mut bytes = Chunked::new();
142        bytes.put(Bytes::from_static(b"hello"));
143        bytes.put(Bytes::from_static(b", "));
144        bytes.put(Bytes::from_static(b"world"));
145
146        assert_eq!(bytes.remaining(), 12);
147
148        let slice = &mut [0u8; 12];
149        bytes.copy_to_slice(slice);
150
151        assert_eq!(bytes.remaining(), 0);
152        assert_eq!(slice, b"hello, world");
153
154        let mut bytes = Chunked::new();
155        bytes.put(Bytes::from_static(b"hello, world"));
156
157        assert_eq!(bytes.remaining(), 12);
158
159        let slice = &mut [0u8; 5];
160        bytes.copy_to_slice(slice);
161
162        assert_eq!(bytes.remaining(), 7);
163        assert_eq!(slice, b"hello");
164        assert_eq!(
165            bytes.chunks.front().map(|x| x.chunk()),
166            Some(&b", world"[..])
167        );
168    }
169
170    #[test]
171    fn test_chunked_bytes_copy_to_bytes() {
172        let mut bytes = Chunked::new();
173        bytes.put(Bytes::from_static(b"hello"));
174        bytes.put(Bytes::from_static(b", "));
175        bytes.put(Bytes::from_static(b"world"));
176
177        assert_eq!(bytes.remaining(), 12);
178
179        let copy = bytes.copy_to_bytes(12);
180
181        assert_eq!(bytes.remaining(), 0);
182        assert_eq!(copy.as_ref(), b"hello, world");
183
184        let mut bytes = Chunked::new();
185        bytes.put(Bytes::from_static(b"hello, world"));
186
187        assert_eq!(bytes.remaining(), 12);
188
189        let copy = bytes.copy_to_bytes(5);
190
191        assert_eq!(bytes.remaining(), 7);
192        assert_eq!(copy.as_ref(), b"hello");
193
194        let mut bytes = Chunked::new();
195        bytes.put(Bytes::from_static(b"hello, world"));
196
197        assert_eq!(bytes.remaining(), 12);
198
199        let copy = bytes.copy_to_bytes(12);
200
201        assert_eq!(bytes.remaining(), 0);
202        assert_eq!(copy.as_ref(), b"hello, world");
203    }
204
205    #[test]
206    fn test_chunken_bytes_data() {
207        let mut bytes = Chunked::new();
208        bytes.put(Bytes::from_static(&[0]));
209        bytes.put(Bytes::from_static(&[0]));
210        bytes.put(Bytes::from_static(&[0]));
211        bytes.put(Bytes::from_static(&[12]));
212        bytes.put(Bytes::from_static(&[0]));
213
214        assert_eq!(bytes.get_u32(), 12);
215        assert_eq!(bytes.remaining(), 1);
216
217        let mut bytes = Chunked::new();
218        bytes.put(Bytes::from_static(&[0, 0]));
219        bytes.put(Bytes::from_static(&[0, 12]));
220        bytes.put(Bytes::from_static(&[0]));
221
222        assert_eq!(bytes.get_u32(), 12);
223        assert_eq!(bytes.remaining(), 1);
224
225        let mut bytes = Chunked::new();
226        bytes.put(Bytes::from_static(&[0, 0, 0, 12]));
227        bytes.put(Bytes::from_static(&[0]));
228
229        assert_eq!(bytes.get_u32(), 12);
230        assert_eq!(bytes.remaining(), 1);
231
232        let mut bytes = Chunked::new();
233        bytes.put(Bytes::from_static(&[0, 0, 0, 12, 0]));
234
235        assert_eq!(bytes.get_u32(), 12);
236        assert_eq!(bytes.remaining(), 1);
237
238        let mut bytes = Chunked::new();
239        bytes.put(Bytes::from_static(&[0, 0, 0, 12]));
240
241        assert_eq!(bytes.get_u32(), 12);
242        assert_eq!(bytes.remaining(), 0);
243    }
244
245    #[test]
246    fn test_chunked_bytes_vectored() {
247        let mut dst = Vec::new();
248
249        let mut bytes = Chunked::new();
250        bytes.put(Bytes::from_static(b"hello"));
251        bytes.put(Bytes::from_static(b", "));
252        bytes.put(Bytes::from_static(b"world"));
253        bytes.put(Bytes::from_static(b"!"));
254
255        assert_eq!(bytes.remaining(), 13);
256
257        let mut bufs = [
258            IoSlice::new(&[]),
259            IoSlice::new(&[]),
260            IoSlice::new(&[]),
261            IoSlice::new(&[]),
262            IoSlice::new(&[]),
263            IoSlice::new(&[]),
264            IoSlice::new(&[]),
265            IoSlice::new(&[]),
266        ];
267
268        assert_eq!(bytes.chunks_vectored(&mut bufs), 4);
269        let _ = Cursor::new(&mut dst).write_vectored(&bufs[0..4]).unwrap();
270
271        assert_eq!(&dst[..], b"hello, world!");
272
273        let mut dst = Vec::new();
274        let mut bytes = Chunked::new();
275        bytes.put(Bytes::from_static(b"hello"));
276        bytes.put(Bytes::from_static(b", "));
277        bytes.put(Bytes::from_static(b"world"));
278        bytes.put(Bytes::from_static(b"!"));
279
280        assert_eq!(bytes.remaining(), 13);
281
282        let mut bufs = [IoSlice::new(&[]), IoSlice::new(&[])];
283
284        assert_eq!(bytes.chunks_vectored(&mut bufs), 2);
285        let _ = Cursor::new(&mut dst).write_vectored(&bufs[0..2]).unwrap();
286
287        assert_eq!(&dst[..], b"hello, ");
288    }
289}