chunked_bytes/
strictly.rs

1//! Buffer with a strict limit on the chunk sizes.
2
3use super::chunked::{AdvanceStopped, Inner};
4use crate::{DrainChunks, IntoChunks};
5
6use bytes::buf::{Buf, BufMut, UninitSlice};
7use bytes::Bytes;
8
9use std::cmp::min;
10use std::fmt;
11use std::io::IoSlice;
12
13/// A non-contiguous buffer for efficient serialization of data structures.
14///
15/// A `ChunkedBytes` container has a staging buffer to coalesce small byte
16/// sequences of source data, and a queue of byte chunks split off the staging
17/// buffer that can be incrementally consumed by an output API such as an object
18/// implementing `AsyncWrite`. Once the number of bytes in the staging
19/// buffer reaches a certain configured chunk size, the buffer content is
20/// split off to form a new chunk.
21///
22/// Unlike `loosely::ChunkedBytes`, this variant of the `ChunkedBytes` container
23/// never produces chunks larger than the configured size. This comes at a cost
24/// of increased processing overhead and sometimes more allocated memory needed
25/// to keep the buffered data, so the applications that don't benefit from
26/// the strict limit should prefer `loosely::ChunkedBytes`.
27///
28/// Refer to the documentation on the methods available for `ChunkedBytes`,
29/// including the methods of traits `Buf` and `BufMut`, for details on working
30/// with this container.
31#[derive(Debug, Default)]
32pub struct ChunkedBytes {
33    inner: Inner,
34    // Maintains own capacity counter because `BytesMut` can't guarantee
35    // the exact requested capacity.
36    cap: usize,
37}
38
39impl ChunkedBytes {
40    /// Creates a new `ChunkedBytes` container with the chunk size limit
41    /// set to a default value.
42    #[inline]
43    pub fn new() -> Self {
44        Default::default()
45    }
46
47    /// Creates a new `ChunkedBytes` container with the given chunk size limit.
48    #[inline]
49    pub fn with_chunk_size_limit(chunk_size: usize) -> Self {
50        ChunkedBytes {
51            inner: Inner::with_chunk_size(chunk_size),
52            cap: 0,
53        }
54    }
55
56    /// The fully detailed constructor for `ChunkedBytes`.
57    /// The chunk size limit is given in `chunk_size`, and an upper
58    /// estimate of the number of chunks this container could be expected to
59    /// have at any moment of time should be given in `chunking_capacity`.
60    /// More chunks can still be held, but this may cause reallocations of
61    /// internal data structures.
62    #[inline]
63    pub fn with_profile(chunk_size: usize, chunking_capacity: usize) -> Self {
64        ChunkedBytes {
65            inner: Inner::with_profile(chunk_size, chunking_capacity),
66            cap: 0,
67        }
68    }
69
70    /// Returns the size this `ChunkedBytes` container uses as the limit
71    /// for splitting off complete chunks.
72    ///
73    /// Note that the size of produced chunks may be smaller than the
74    /// configured value, due to the allocation strategy used internally by
75    /// the implementation and also depending on the pattern of usage.
76    #[inline]
77    pub fn chunk_size_limit(&self) -> usize {
78        self.inner.chunk_size()
79    }
80
81    /// Returns true if the `ChunkedBytes` container has no complete chunks
82    /// and the staging buffer is empty.
83    #[inline]
84    pub fn is_empty(&self) -> bool {
85        self.inner.is_empty()
86    }
87
88    #[cfg(test)]
89    pub fn staging_capacity(&self) -> usize {
90        self.inner.staging_capacity()
91    }
92
93    /// Splits any bytes that are currently in the staging buffer into a new
94    /// complete chunk.
95    /// If the staging buffer is empty, this method does nothing.
96    ///
97    /// Most users should not need to call this method. It is called
98    /// internally when needed by the methods that advance the writing
99    /// position.
100    #[inline]
101    pub fn flush(&mut self) {
102        debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
103        self.inner.flush()
104    }
105
106    /// Appends a `Bytes` slice to the container without copying the data.
107    ///
108    /// If `src` is empty, this method does nothing. Otherwise,
109    /// if there are any bytes currently in the staging buffer, they are split
110    /// to form a complete chunk. Next, `src` is appended as a sequence of
111    /// chunks, split if necessary so that all chunks except the last are
112    /// sized to the chunk size limit.
113    ///
114    /// # Performance Notes
115    ///
116    /// For a small slice originating from a buffer that is not split
117    /// or shared between other `Bytes` instances, copying the bytes with
118    /// `BufMut::put_slice` may be faster than the overhead of
119    /// atomic reference counting induced by use of this method.
120    pub fn put_bytes(&mut self, mut src: Bytes) {
121        if !src.is_empty() {
122            self.flush();
123            let chunk_size = self.inner.chunk_size();
124            while src.len() > chunk_size {
125                self.inner.push_chunk(src.split_to(chunk_size));
126            }
127            self.inner.push_chunk(src);
128        }
129    }
130
131    /// Returns an iterator that removes complete chunks from the
132    /// `ChunkedBytes` container and yields the removed chunks as `Bytes`
133    /// slice handles. This does not include bytes in the staging buffer.
134    ///
135    /// The chunks are removed even if the iterator is dropped without being
136    /// consumed until the end. It is unspecified how many chunks are removed
137    /// if the `DrainChunks` value is not dropped, but the borrow it holds
138    /// expires (e.g. due to `std::mem::forget`).
139    #[inline]
140    pub fn drain_chunks(&mut self) -> DrainChunks<'_> {
141        self.inner.drain_chunks()
142    }
143
144    /// Consumes the `ChunkedBytes` container to produce an iterator over
145    /// its chunks. If there are bytes in the staging buffer, they are yielded
146    /// as the last src.
147    ///
148    /// The memory allocated for `IntoChunks` may be slightly more than the
149    /// `ChunkedBytes` container it consumes. This is an infrequent side effect
150    /// of making the internal state efficient in general for iteration.
151    #[inline]
152    pub fn into_chunks(self) -> IntoChunks {
153        debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
154        self.inner.into_chunks()
155    }
156}
157
158unsafe impl BufMut for ChunkedBytes {
159    #[inline]
160    fn remaining_mut(&self) -> usize {
161        self.inner.remaining_mut()
162    }
163
164    #[inline]
165    unsafe fn advance_mut(&mut self, cnt: usize) {
166        assert!(
167            self.inner.staging_len() + cnt <= self.cap,
168            "new_len = {}; capacity = {}",
169            self.inner.staging_len() + cnt,
170            self.cap
171        );
172        self.inner.advance_mut(cnt);
173    }
174
175    fn chunk_mut(&mut self) -> &mut UninitSlice {
176        if self.inner.staging_len() == self.cap {
177            let new_cap = self.inner.reserve_staging();
178            self.cap = min(new_cap, self.chunk_size_limit())
179        }
180        let chunk = self.inner.chunk_mut();
181        let len = min(chunk.len(), self.cap);
182        &mut chunk[..len]
183    }
184}
185
186impl Buf for ChunkedBytes {
187    #[inline]
188    fn remaining(&self) -> usize {
189        self.inner.remaining()
190    }
191
192    #[inline]
193    fn has_remaining(&self) -> bool {
194        !self.is_empty()
195    }
196
197    /// Returns a slice of the bytes in the first extant complete chunk,
198    /// or the bytes in the staging buffer if there are no unconsumed chunks.
199    ///
200    /// It is more efficient to use `chunks_vectored` to gather all the disjoint
201    /// slices for vectored output.
202    #[inline]
203    fn chunk(&self) -> &[u8] {
204        self.inner.chunk()
205    }
206
207    /// Advances the reading position by `cnt`, dropping the `Bytes` references
208    /// to any complete chunks that the position has been advanced past
209    /// and then advancing the starting position of the first remaining chunk.
210    /// If there are no complete chunks left, the reading position is advanced
211    /// in the staging buffer, effectively removing the consumed bytes.
212    ///
213    /// # Panics
214    ///
215    /// This function may panic when `cnt > self.remaining()`.
216    ///
217    fn advance(&mut self, cnt: usize) {
218        match self.inner.advance(cnt) {
219            AdvanceStopped::InChunk => {}
220            AdvanceStopped::InStaging(adv) => {
221                self.cap -= adv;
222            }
223        }
224    }
225
226    /// Fills `dst` sequentially with the slice views of the chunks, then
227    /// the bytes in the staging buffer if any remain and there is
228    /// another unfilled entry left in `dst`. Returns the number of `IoSlice`
229    /// entries filled.
230    #[inline]
231    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
232        debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
233        self.inner.chunks_vectored(dst)
234    }
235
236    #[inline]
237    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
238        self.inner.copy_to_bytes(len)
239    }
240}
241
242impl fmt::Write for ChunkedBytes {
243    #[inline]
244    fn write_str(&mut self, s: &str) -> fmt::Result {
245        if self.remaining_mut() >= s.len() {
246            self.put_slice(s.as_bytes());
247            Ok(())
248        } else {
249            Err(fmt::Error)
250        }
251    }
252
253    // The default implementation delegates to
254    // fmt::write(&mut self as &mut dyn fmt::Write, args)
255    #[inline]
256    fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
257        fmt::write(self, args)
258    }
259}