1use crate::{DrainChunks, IntoChunks};
2
3use bytes::buf::{Buf, BufMut, UninitSlice};
4use bytes::{Bytes, BytesMut};
5
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::io::IoSlice;
9
10const DEFAULT_CHUNK_SIZE: usize = 4096;
11
12#[derive(Debug)]
13pub(crate) struct Inner {
14 staging: BytesMut,
15 chunks: VecDeque<Bytes>,
16 chunk_size: usize,
17}
18
19impl Default for Inner {
20 #[inline]
21 fn default() -> Self {
22 Inner {
23 staging: BytesMut::new(),
24 chunks: VecDeque::new(),
25 chunk_size: DEFAULT_CHUNK_SIZE,
26 }
27 }
28}
29
30pub(crate) enum AdvanceStopped {
31 InChunk,
32 InStaging(usize),
33}
34
35impl Inner {
36 #[inline]
37 pub fn with_chunk_size(chunk_size: usize) -> Self {
38 Inner {
39 chunk_size,
40 ..Default::default()
41 }
42 }
43
44 #[inline]
45 pub fn with_profile(chunk_size: usize, chunking_capacity: usize) -> Self {
46 Inner {
47 staging: BytesMut::new(),
48 chunks: VecDeque::with_capacity(chunking_capacity),
49 chunk_size,
50 }
51 }
52
53 #[inline]
54 pub fn chunk_size(&self) -> usize {
55 self.chunk_size
56 }
57
58 #[inline]
59 pub fn is_empty(&self) -> bool {
60 self.chunks.is_empty() && self.staging.is_empty()
61 }
62
63 #[inline]
64 pub fn staging_len(&self) -> usize {
65 self.staging.len()
66 }
67
68 #[inline]
69 pub fn staging_capacity(&self) -> usize {
70 self.staging.capacity()
71 }
72
73 #[inline]
74 pub fn push_chunk(&mut self, chunk: Bytes) {
75 debug_assert!(!chunk.is_empty());
76 self.chunks.push_back(chunk)
77 }
78
79 #[inline]
80 pub fn flush(&mut self) {
81 if !self.staging.is_empty() {
82 let bytes = self.staging.split().freeze();
83 self.push_chunk(bytes)
84 }
85 }
86
87 #[inline]
88 pub fn drain_chunks(&mut self) -> DrainChunks<'_> {
89 DrainChunks::new(self.chunks.drain(..))
90 }
91
92 #[inline]
93 pub fn into_chunks(mut self) -> IntoChunks {
94 if !self.staging.is_empty() {
95 self.chunks.push_back(self.staging.freeze());
96 }
97 IntoChunks::new(self.chunks.into_iter())
98 }
99
100 pub fn reserve_staging(&mut self) -> usize {
101 let cap = self.staging.capacity();
102
103 let cutoff = cap.saturating_add(cap / 2);
122 let additional = if cutoff > self.chunk_size {
123 self.flush();
128 self.chunk_size
129 } else {
130 self.chunk_size - cap
135 };
136 self.staging.reserve(additional);
137 self.staging.capacity()
138 }
139
140 #[inline]
141 pub fn remaining_mut(&self) -> usize {
142 self.staging.remaining_mut()
143 }
144
145 #[inline]
146 pub unsafe fn advance_mut(&mut self, cnt: usize) {
147 self.staging.advance_mut(cnt);
148 }
149
150 #[inline]
151 pub fn chunk_mut(&mut self) -> &mut UninitSlice {
152 self.staging.chunk_mut()
153 }
154
155 pub fn remaining(&self) -> usize {
156 self.chunks
157 .iter()
158 .fold(self.staging.len(), |sum, chunk| sum + chunk.len())
159 }
160
161 #[inline]
162 pub fn chunk(&self) -> &[u8] {
163 if let Some(chunk) = self.chunks.front() {
164 chunk
165 } else {
166 self.staging.chunk()
167 }
168 }
169
170 pub fn advance(&mut self, mut cnt: usize) -> AdvanceStopped {
171 loop {
172 match self.chunks.front_mut() {
173 None => {
174 self.staging.advance(cnt);
175 return AdvanceStopped::InStaging(cnt);
176 }
177 Some(chunk) => {
178 let len = chunk.len();
179 if cnt < len {
180 chunk.advance(cnt);
181 return AdvanceStopped::InChunk;
182 } else {
183 cnt -= len;
184 self.chunks.pop_front();
185 }
186 }
187 }
188 }
189 }
190
191 pub fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
192 let n = {
193 let zipped = dst.iter_mut().zip(self.chunks.iter());
194 let len = zipped.len();
195 for (io_slice, chunk) in zipped {
196 *io_slice = IoSlice::new(chunk);
197 }
198 len
199 };
200
201 if n < dst.len() && !self.staging.is_empty() {
202 dst[n] = IoSlice::new(&self.staging);
203 n + 1
204 } else {
205 n
206 }
207 }
208
209 pub fn copy_to_bytes(&mut self, len: usize) -> Bytes {
210 if self.chunks.is_empty() {
211 return self.staging.copy_to_bytes(len);
212 }
213 let mut to_copy = min(len, self.remaining());
214 let mut buf = BytesMut::with_capacity(to_copy);
215 loop {
216 match self.chunks.front_mut() {
217 None => {
218 buf.put((&mut self.staging).take(to_copy));
219 break;
220 }
221 Some(chunk) => {
222 if chunk.len() > to_copy {
223 buf.put(chunk.take(to_copy));
224 break;
225 } else {
226 buf.extend_from_slice(chunk);
227 to_copy -= chunk.len();
228 }
229 }
230 }
231 self.chunks.pop_front();
232 }
233 buf.freeze()
234 }
235}