chunked_bytes/strictly.rs
1//! Buffer with a strict limit on the chunk sizes.
2
3use super::chunked::{AdvanceStopped, Inner};
4use crate::{DrainChunks, IntoChunks};
5
6use bytes::buf::{Buf, BufMut, UninitSlice};
7use bytes::Bytes;
8
9use std::cmp::min;
10use std::fmt;
11use std::io::IoSlice;
12
13/// A non-contiguous buffer for efficient serialization of data structures.
14///
15/// A `ChunkedBytes` container has a staging buffer to coalesce small byte
16/// sequences of source data, and a queue of byte chunks split off the staging
17/// buffer that can be incrementally consumed by an output API such as an object
18/// implementing `AsyncWrite`. Once the number of bytes in the staging
19/// buffer reaches a certain configured chunk size, the buffer content is
20/// split off to form a new chunk.
21///
22/// Unlike `loosely::ChunkedBytes`, this variant of the `ChunkedBytes` container
23/// never produces chunks larger than the configured size. This comes at a cost
24/// of increased processing overhead and sometimes more allocated memory needed
25/// to keep the buffered data, so the applications that don't benefit from
26/// the strict limit should prefer `loosely::ChunkedBytes`.
27///
28/// Refer to the documentation on the methods available for `ChunkedBytes`,
29/// including the methods of traits `Buf` and `BufMut`, for details on working
30/// with this container.
31#[derive(Debug, Default)]
32pub struct ChunkedBytes {
33 inner: Inner,
34 // Maintains own capacity counter because `BytesMut` can't guarantee
35 // the exact requested capacity.
36 cap: usize,
37}
38
39impl ChunkedBytes {
40 /// Creates a new `ChunkedBytes` container with the chunk size limit
41 /// set to a default value.
42 #[inline]
43 pub fn new() -> Self {
44 Default::default()
45 }
46
47 /// Creates a new `ChunkedBytes` container with the given chunk size limit.
48 #[inline]
49 pub fn with_chunk_size_limit(chunk_size: usize) -> Self {
50 ChunkedBytes {
51 inner: Inner::with_chunk_size(chunk_size),
52 cap: 0,
53 }
54 }
55
56 /// The fully detailed constructor for `ChunkedBytes`.
57 /// The chunk size limit is given in `chunk_size`, and an upper
58 /// estimate of the number of chunks this container could be expected to
59 /// have at any moment of time should be given in `chunking_capacity`.
60 /// More chunks can still be held, but this may cause reallocations of
61 /// internal data structures.
62 #[inline]
63 pub fn with_profile(chunk_size: usize, chunking_capacity: usize) -> Self {
64 ChunkedBytes {
65 inner: Inner::with_profile(chunk_size, chunking_capacity),
66 cap: 0,
67 }
68 }
69
70 /// Returns the size this `ChunkedBytes` container uses as the limit
71 /// for splitting off complete chunks.
72 ///
73 /// Note that the size of produced chunks may be smaller than the
74 /// configured value, due to the allocation strategy used internally by
75 /// the implementation and also depending on the pattern of usage.
76 #[inline]
77 pub fn chunk_size_limit(&self) -> usize {
78 self.inner.chunk_size()
79 }
80
81 /// Returns true if the `ChunkedBytes` container has no complete chunks
82 /// and the staging buffer is empty.
83 #[inline]
84 pub fn is_empty(&self) -> bool {
85 self.inner.is_empty()
86 }
87
88 #[cfg(test)]
89 pub fn staging_capacity(&self) -> usize {
90 self.inner.staging_capacity()
91 }
92
93 /// Splits any bytes that are currently in the staging buffer into a new
94 /// complete chunk.
95 /// If the staging buffer is empty, this method does nothing.
96 ///
97 /// Most users should not need to call this method. It is called
98 /// internally when needed by the methods that advance the writing
99 /// position.
100 #[inline]
101 pub fn flush(&mut self) {
102 debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
103 self.inner.flush()
104 }
105
106 /// Appends a `Bytes` slice to the container without copying the data.
107 ///
108 /// If `src` is empty, this method does nothing. Otherwise,
109 /// if there are any bytes currently in the staging buffer, they are split
110 /// to form a complete chunk. Next, `src` is appended as a sequence of
111 /// chunks, split if necessary so that all chunks except the last are
112 /// sized to the chunk size limit.
113 ///
114 /// # Performance Notes
115 ///
116 /// For a small slice originating from a buffer that is not split
117 /// or shared between other `Bytes` instances, copying the bytes with
118 /// `BufMut::put_slice` may be faster than the overhead of
119 /// atomic reference counting induced by use of this method.
120 pub fn put_bytes(&mut self, mut src: Bytes) {
121 if !src.is_empty() {
122 self.flush();
123 let chunk_size = self.inner.chunk_size();
124 while src.len() > chunk_size {
125 self.inner.push_chunk(src.split_to(chunk_size));
126 }
127 self.inner.push_chunk(src);
128 }
129 }
130
131 /// Returns an iterator that removes complete chunks from the
132 /// `ChunkedBytes` container and yields the removed chunks as `Bytes`
133 /// slice handles. This does not include bytes in the staging buffer.
134 ///
135 /// The chunks are removed even if the iterator is dropped without being
136 /// consumed until the end. It is unspecified how many chunks are removed
137 /// if the `DrainChunks` value is not dropped, but the borrow it holds
138 /// expires (e.g. due to `std::mem::forget`).
139 #[inline]
140 pub fn drain_chunks(&mut self) -> DrainChunks<'_> {
141 self.inner.drain_chunks()
142 }
143
144 /// Consumes the `ChunkedBytes` container to produce an iterator over
145 /// its chunks. If there are bytes in the staging buffer, they are yielded
146 /// as the last src.
147 ///
148 /// The memory allocated for `IntoChunks` may be slightly more than the
149 /// `ChunkedBytes` container it consumes. This is an infrequent side effect
150 /// of making the internal state efficient in general for iteration.
151 #[inline]
152 pub fn into_chunks(self) -> IntoChunks {
153 debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
154 self.inner.into_chunks()
155 }
156}
157
158unsafe impl BufMut for ChunkedBytes {
159 #[inline]
160 fn remaining_mut(&self) -> usize {
161 self.inner.remaining_mut()
162 }
163
164 #[inline]
165 unsafe fn advance_mut(&mut self, cnt: usize) {
166 assert!(
167 self.inner.staging_len() + cnt <= self.cap,
168 "new_len = {}; capacity = {}",
169 self.inner.staging_len() + cnt,
170 self.cap
171 );
172 self.inner.advance_mut(cnt);
173 }
174
175 fn chunk_mut(&mut self) -> &mut UninitSlice {
176 if self.inner.staging_len() == self.cap {
177 let new_cap = self.inner.reserve_staging();
178 self.cap = min(new_cap, self.chunk_size_limit())
179 }
180 let chunk = self.inner.chunk_mut();
181 let len = min(chunk.len(), self.cap);
182 &mut chunk[..len]
183 }
184}
185
186impl Buf for ChunkedBytes {
187 #[inline]
188 fn remaining(&self) -> usize {
189 self.inner.remaining()
190 }
191
192 #[inline]
193 fn has_remaining(&self) -> bool {
194 !self.is_empty()
195 }
196
197 /// Returns a slice of the bytes in the first extant complete chunk,
198 /// or the bytes in the staging buffer if there are no unconsumed chunks.
199 ///
200 /// It is more efficient to use `chunks_vectored` to gather all the disjoint
201 /// slices for vectored output.
202 #[inline]
203 fn chunk(&self) -> &[u8] {
204 self.inner.chunk()
205 }
206
207 /// Advances the reading position by `cnt`, dropping the `Bytes` references
208 /// to any complete chunks that the position has been advanced past
209 /// and then advancing the starting position of the first remaining chunk.
210 /// If there are no complete chunks left, the reading position is advanced
211 /// in the staging buffer, effectively removing the consumed bytes.
212 ///
213 /// # Panics
214 ///
215 /// This function may panic when `cnt > self.remaining()`.
216 ///
217 fn advance(&mut self, cnt: usize) {
218 match self.inner.advance(cnt) {
219 AdvanceStopped::InChunk => {}
220 AdvanceStopped::InStaging(adv) => {
221 self.cap -= adv;
222 }
223 }
224 }
225
226 /// Fills `dst` sequentially with the slice views of the chunks, then
227 /// the bytes in the staging buffer if any remain and there is
228 /// another unfilled entry left in `dst`. Returns the number of `IoSlice`
229 /// entries filled.
230 #[inline]
231 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
232 debug_assert!(self.inner.staging_len() <= self.inner.chunk_size());
233 self.inner.chunks_vectored(dst)
234 }
235
236 #[inline]
237 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
238 self.inner.copy_to_bytes(len)
239 }
240}
241
242impl fmt::Write for ChunkedBytes {
243 #[inline]
244 fn write_str(&mut self, s: &str) -> fmt::Result {
245 if self.remaining_mut() >= s.len() {
246 self.put_slice(s.as_bytes());
247 Ok(())
248 } else {
249 Err(fmt::Error)
250 }
251 }
252
253 // The default implementation delegates to
254 // fmt::write(&mut self as &mut dyn fmt::Write, args)
255 #[inline]
256 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
257 fmt::write(self, args)
258 }
259}