1use std::{
2 collections::{VecDeque, vec_deque},
3 io::IoSlice,
4};
5
6use bytes::{Buf, Bytes};
7
8#[derive(Debug, Default, Clone)]
9pub struct Chunked<T> {
10 remaining: usize,
11 chunks: VecDeque<T>,
12}
13
14impl<T> Chunked<T> {
15 pub fn new() -> Self {
16 Self {
17 remaining: 0,
18 chunks: Default::default(),
19 }
20 }
21
22 pub fn with_capacity(cap: usize) -> Self {
23 Self {
24 remaining: 0,
25 chunks: VecDeque::with_capacity(cap),
26 }
27 }
28
29 #[inline]
30 pub fn iter(&self) -> vec_deque::Iter<'_, T> {
31 self.chunks.iter()
32 }
33}
34
35impl<T: Buf> Chunked<T> {
36 #[inline]
37 pub fn put(&mut self, chunk: T) {
38 if !chunk.has_remaining() {
39 return;
40 }
41
42 self.remaining += chunk.remaining();
43 self.chunks.push_back(chunk);
44 }
45}
46
47impl<T: Buf> Buf for Chunked<T> {
48 #[inline]
49 fn remaining(&self) -> usize {
50 self.remaining
51 }
52
53 #[inline]
54 fn chunk(&self) -> &[u8] {
55 self.chunks.front().map(|x| x.chunk()).unwrap_or(&[])
56 }
57
58 fn advance(&mut self, mut cnt: usize) {
59 self.remaining -= cnt;
60
61 while cnt > 0 {
62 let Some(chunk) = self.chunks.front_mut() else {
63 panic!("advance: no available chunks!");
64 };
65
66 let len = chunk.remaining();
67 if cnt < len {
68 chunk.advance(cnt);
69 return;
70 } else {
71 cnt -= len;
72 self.chunks.pop_front();
73 }
74 }
75 }
76
77 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
78 let mut iter = self.chunks.iter();
79 let mut len = 0;
80
81 while len < dst.len() {
82 let Some(chunk) = iter.next() else {
83 break;
84 };
85
86 len += chunk.chunks_vectored(&mut dst[len..]);
87 }
88
89 len
90 }
91
92 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
93 if let Some(chunk) = self.chunks.front_mut() {
94 if chunk.remaining() > len {
95 self.remaining -= len;
96 chunk.copy_to_bytes(len)
97 } else if chunk.remaining() == len {
98 self.remaining -= len;
99 self.chunks.pop_front().unwrap().copy_to_bytes(len)
100 } else {
101 use bytes::buf::BufMut;
102 let mut ret = bytes::BytesMut::with_capacity(len);
103 ret.put(self.take(len));
104 ret.freeze()
105 }
106 } else {
107 panic!("copy_to_bytes: no available chunks!");
108 }
109 }
110}
111
112impl<T: Buf> From<Vec<T>> for Chunked<T> {
113 fn from(value: Vec<T>) -> Self {
114 Self {
115 remaining: value.iter().map(|x| x.remaining()).sum(),
116 chunks: value.into(),
117 }
118 }
119}
120
121impl<T> IntoIterator for Chunked<T> {
122 type Item = T;
123 type IntoIter = vec_deque::IntoIter<T>;
124
125 #[inline]
126 fn into_iter(self) -> Self::IntoIter {
127 self.chunks.into_iter()
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use std::io::{Cursor, IoSlice, Write};
134
135 use bytes::{Buf, Bytes};
136
137 use super::Chunked;
138
139 #[test]
140 fn test_chunked_bytes_copy_to_slice() {
141 let mut bytes = Chunked::new();
142 bytes.put(Bytes::from_static(b"hello"));
143 bytes.put(Bytes::from_static(b", "));
144 bytes.put(Bytes::from_static(b"world"));
145
146 assert_eq!(bytes.remaining(), 12);
147
148 let slice = &mut [0u8; 12];
149 bytes.copy_to_slice(slice);
150
151 assert_eq!(bytes.remaining(), 0);
152 assert_eq!(slice, b"hello, world");
153
154 let mut bytes = Chunked::new();
155 bytes.put(Bytes::from_static(b"hello, world"));
156
157 assert_eq!(bytes.remaining(), 12);
158
159 let slice = &mut [0u8; 5];
160 bytes.copy_to_slice(slice);
161
162 assert_eq!(bytes.remaining(), 7);
163 assert_eq!(slice, b"hello");
164 assert_eq!(
165 bytes.chunks.front().map(|x| x.chunk()),
166 Some(&b", world"[..])
167 );
168 }
169
170 #[test]
171 fn test_chunked_bytes_copy_to_bytes() {
172 let mut bytes = Chunked::new();
173 bytes.put(Bytes::from_static(b"hello"));
174 bytes.put(Bytes::from_static(b", "));
175 bytes.put(Bytes::from_static(b"world"));
176
177 assert_eq!(bytes.remaining(), 12);
178
179 let copy = bytes.copy_to_bytes(12);
180
181 assert_eq!(bytes.remaining(), 0);
182 assert_eq!(copy.as_ref(), b"hello, world");
183
184 let mut bytes = Chunked::new();
185 bytes.put(Bytes::from_static(b"hello, world"));
186
187 assert_eq!(bytes.remaining(), 12);
188
189 let copy = bytes.copy_to_bytes(5);
190
191 assert_eq!(bytes.remaining(), 7);
192 assert_eq!(copy.as_ref(), b"hello");
193
194 let mut bytes = Chunked::new();
195 bytes.put(Bytes::from_static(b"hello, world"));
196
197 assert_eq!(bytes.remaining(), 12);
198
199 let copy = bytes.copy_to_bytes(12);
200
201 assert_eq!(bytes.remaining(), 0);
202 assert_eq!(copy.as_ref(), b"hello, world");
203 }
204
205 #[test]
206 fn test_chunken_bytes_data() {
207 let mut bytes = Chunked::new();
208 bytes.put(Bytes::from_static(&[0]));
209 bytes.put(Bytes::from_static(&[0]));
210 bytes.put(Bytes::from_static(&[0]));
211 bytes.put(Bytes::from_static(&[12]));
212 bytes.put(Bytes::from_static(&[0]));
213
214 assert_eq!(bytes.get_u32(), 12);
215 assert_eq!(bytes.remaining(), 1);
216
217 let mut bytes = Chunked::new();
218 bytes.put(Bytes::from_static(&[0, 0]));
219 bytes.put(Bytes::from_static(&[0, 12]));
220 bytes.put(Bytes::from_static(&[0]));
221
222 assert_eq!(bytes.get_u32(), 12);
223 assert_eq!(bytes.remaining(), 1);
224
225 let mut bytes = Chunked::new();
226 bytes.put(Bytes::from_static(&[0, 0, 0, 12]));
227 bytes.put(Bytes::from_static(&[0]));
228
229 assert_eq!(bytes.get_u32(), 12);
230 assert_eq!(bytes.remaining(), 1);
231
232 let mut bytes = Chunked::new();
233 bytes.put(Bytes::from_static(&[0, 0, 0, 12, 0]));
234
235 assert_eq!(bytes.get_u32(), 12);
236 assert_eq!(bytes.remaining(), 1);
237
238 let mut bytes = Chunked::new();
239 bytes.put(Bytes::from_static(&[0, 0, 0, 12]));
240
241 assert_eq!(bytes.get_u32(), 12);
242 assert_eq!(bytes.remaining(), 0);
243 }
244
245 #[test]
246 fn test_chunked_bytes_vectored() {
247 let mut dst = Vec::new();
248
249 let mut bytes = Chunked::new();
250 bytes.put(Bytes::from_static(b"hello"));
251 bytes.put(Bytes::from_static(b", "));
252 bytes.put(Bytes::from_static(b"world"));
253 bytes.put(Bytes::from_static(b"!"));
254
255 assert_eq!(bytes.remaining(), 13);
256
257 let mut bufs = [
258 IoSlice::new(&[]),
259 IoSlice::new(&[]),
260 IoSlice::new(&[]),
261 IoSlice::new(&[]),
262 IoSlice::new(&[]),
263 IoSlice::new(&[]),
264 IoSlice::new(&[]),
265 IoSlice::new(&[]),
266 ];
267
268 assert_eq!(bytes.chunks_vectored(&mut bufs), 4);
269 let _ = Cursor::new(&mut dst).write_vectored(&bufs[0..4]).unwrap();
270
271 assert_eq!(&dst[..], b"hello, world!");
272
273 let mut dst = Vec::new();
274 let mut bytes = Chunked::new();
275 bytes.put(Bytes::from_static(b"hello"));
276 bytes.put(Bytes::from_static(b", "));
277 bytes.put(Bytes::from_static(b"world"));
278 bytes.put(Bytes::from_static(b"!"));
279
280 assert_eq!(bytes.remaining(), 13);
281
282 let mut bufs = [IoSlice::new(&[]), IoSlice::new(&[])];
283
284 assert_eq!(bytes.chunks_vectored(&mut bufs), 2);
285 let _ = Cursor::new(&mut dst).write_vectored(&bufs[0..2]).unwrap();
286
287 assert_eq!(&dst[..], b"hello, ");
288 }
289}