Skip to main content

chunked_bytes/
chunked.rs

1use crate::{DrainChunks, IntoChunks};
2
3use bytes::buf::{Buf, BufMut, UninitSlice};
4use bytes::{Bytes, BytesMut};
5
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::io::IoSlice;
9
10const DEFAULT_CHUNK_SIZE: usize = 4096;
11
12#[derive(Debug)]
13pub(crate) struct Inner {
14    staging: BytesMut,
15    chunks: VecDeque<Bytes>,
16    chunk_size: usize,
17}
18
19impl Default for Inner {
20    #[inline]
21    fn default() -> Self {
22        Inner {
23            staging: BytesMut::new(),
24            chunks: VecDeque::new(),
25            chunk_size: DEFAULT_CHUNK_SIZE,
26        }
27    }
28}
29
30pub(crate) enum AdvanceStopped {
31    InChunk,
32    InStaging(usize),
33}
34
35impl Inner {
36    #[inline]
37    pub fn with_chunk_size(chunk_size: usize) -> Self {
38        Inner {
39            chunk_size,
40            ..Default::default()
41        }
42    }
43
44    #[inline]
45    pub fn with_profile(chunk_size: usize, chunking_capacity: usize) -> Self {
46        Inner {
47            staging: BytesMut::new(),
48            chunks: VecDeque::with_capacity(chunking_capacity),
49            chunk_size,
50        }
51    }
52
53    #[inline]
54    pub fn chunk_size(&self) -> usize {
55        self.chunk_size
56    }
57
58    #[inline]
59    pub fn is_empty(&self) -> bool {
60        self.chunks.is_empty() && self.staging.is_empty()
61    }
62
63    #[inline]
64    pub fn staging_len(&self) -> usize {
65        self.staging.len()
66    }
67
68    #[inline]
69    pub fn staging_capacity(&self) -> usize {
70        self.staging.capacity()
71    }
72
73    #[inline]
74    pub fn push_chunk(&mut self, chunk: Bytes) {
75        debug_assert!(!chunk.is_empty());
76        self.chunks.push_back(chunk)
77    }
78
79    #[inline]
80    pub fn flush(&mut self) {
81        if !self.staging.is_empty() {
82            let bytes = self.staging.split().freeze();
83            self.push_chunk(bytes)
84        }
85    }
86
87    #[inline]
88    pub fn drain_chunks(&mut self) -> DrainChunks<'_> {
89        DrainChunks::new(self.chunks.drain(..))
90    }
91
92    #[inline]
93    pub fn into_chunks(mut self) -> IntoChunks {
94        if !self.staging.is_empty() {
95            self.chunks.push_back(self.staging.freeze());
96        }
97        IntoChunks::new(self.chunks.into_iter())
98    }
99
100    pub fn reserve_staging(&mut self) -> usize {
101        let cap = self.staging.capacity();
102
103        // We are here when either:
104        // a) the buffer has never been used and never allocated;
105        // b) the producer has filled a previously allocated buffer,
106        //    and the consumer may have read a part or the whole of it.
107        // Our goal is to reserve space in the staging buffer without
108        // forcing it to reallocate to a larger capacity.
109        //
110        // To reuse the allocation of `BytesMut` in the vector form with
111        // the offset `off` and remaining capacity `cap` while reserving
112        // `additional` bytes, the following needs to apply:
113        //
114        // off >= additional && off >= cap / 2
115        //
116        // We have:
117        //
118        // off + cap == allocated_size >= chunk_size
119        //
120        // From this, we can derive the following condition check:
121        let cutoff = cap.saturating_add(cap / 2);
122        let additional = if cutoff > self.chunk_size {
123            // Alas, the bytes still in the staging buffer are likely to
124            // necessitate a new allocation. Split them off to a chunk
125            // first, so that the new allocation does not have to copy
126            // them and the total required capacity is `self.chunk_size`.
127            self.flush();
128            self.chunk_size
129        } else {
130            // This amount will get BytesMut to reuse the allocation and
131            // copy back the bytes if there are no chunks left unconsumed.
132            // Otherwise, it will reallocate to its previous capacity.
133            // A virgin buffer will be allocated to `self.chunk_size`.
134            self.chunk_size - cap
135        };
136        self.staging.reserve(additional);
137        self.staging.capacity()
138    }
139
140    #[inline]
141    pub fn remaining_mut(&self) -> usize {
142        self.staging.remaining_mut()
143    }
144
145    #[inline]
146    pub unsafe fn advance_mut(&mut self, cnt: usize) {
147        self.staging.advance_mut(cnt);
148    }
149
150    #[inline]
151    pub fn chunk_mut(&mut self) -> &mut UninitSlice {
152        self.staging.chunk_mut()
153    }
154
155    pub fn remaining(&self) -> usize {
156        self.chunks
157            .iter()
158            .fold(self.staging.len(), |sum, chunk| sum + chunk.len())
159    }
160
161    #[inline]
162    pub fn chunk(&self) -> &[u8] {
163        if let Some(chunk) = self.chunks.front() {
164            chunk
165        } else {
166            self.staging.chunk()
167        }
168    }
169
170    pub fn advance(&mut self, mut cnt: usize) -> AdvanceStopped {
171        loop {
172            match self.chunks.front_mut() {
173                None => {
174                    self.staging.advance(cnt);
175                    return AdvanceStopped::InStaging(cnt);
176                }
177                Some(chunk) => {
178                    let len = chunk.len();
179                    if cnt < len {
180                        chunk.advance(cnt);
181                        return AdvanceStopped::InChunk;
182                    } else {
183                        cnt -= len;
184                        self.chunks.pop_front();
185                    }
186                }
187            }
188        }
189    }
190
191    pub fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
192        let n = {
193            let zipped = dst.iter_mut().zip(self.chunks.iter());
194            let len = zipped.len();
195            for (io_slice, chunk) in zipped {
196                *io_slice = IoSlice::new(chunk);
197            }
198            len
199        };
200
201        if n < dst.len() && !self.staging.is_empty() {
202            dst[n] = IoSlice::new(&self.staging);
203            n + 1
204        } else {
205            n
206        }
207    }
208
209    pub fn copy_to_bytes(&mut self, len: usize) -> Bytes {
210        if self.chunks.is_empty() {
211            return self.staging.copy_to_bytes(len);
212        }
213        let mut to_copy = min(len, self.remaining());
214        let mut buf = BytesMut::with_capacity(to_copy);
215        loop {
216            match self.chunks.front_mut() {
217                None => {
218                    buf.put((&mut self.staging).take(to_copy));
219                    break;
220                }
221                Some(chunk) => {
222                    if chunk.len() > to_copy {
223                        buf.put(chunk.take(to_copy));
224                        break;
225                    } else {
226                        buf.extend_from_slice(chunk);
227                        to_copy -= chunk.len();
228                    }
229                }
230            }
231            self.chunks.pop_front();
232        }
233        buf.freeze()
234    }
235}