Skip to main content

ntex_bytes/
pages.rs

1use std::{borrow::Borrow, cmp, collections::VecDeque, fmt, io, mem, ops, ptr};
2
3use crate::{BufMut, BytePageSize, ByteString, Bytes, BytesMut};
4use crate::{buf::UninitSlice, storage::StorageVec};
5
6pub struct BytePages {
7    size: BytePageSize,
8    pages: VecDeque<BytePage>,
9    current: StorageVec,
10}
11
12impl BytePages {
13    /// Creates a new `BytePages` with the specified page size.
14    ///
15    /// The returned `BytePages` will be hold one page with
16    /// specified capacity.
17    pub fn new(size: BytePageSize) -> Self {
18        debug_assert!(size != BytePageSize::Unset, "Page cannot be Unset");
19
20        BytePages {
21            size,
22            pages: VecDeque::with_capacity(8),
23            current: StorageVec::sized(size),
24        }
25    }
26
27    pub fn page_size(&self) -> BytePageSize {
28        self.size
29    }
30
31    pub fn set_page_size(&mut self, size: BytePageSize) {
32        self.size = size;
33    }
34
35    pub fn prepend<T>(&mut self, buf: T) -> bool
36    where
37        BytePage: From<T>,
38    {
39        let p = BytePage::from(buf);
40        if p.is_empty() {
41            false
42        } else {
43            self.pages.push_front(p);
44            true
45        }
46    }
47
48    pub fn append<T>(&mut self, buf: T)
49    where
50        BytePage: From<T>,
51    {
52        let p = BytePage::from(buf);
53        let remaining = self.current.remaining();
54
55        if p.len() <= remaining {
56            self.put_slice(p.as_ref());
57        } else {
58            if self.current.len() != 0 {
59                // push current storage to stack
60                self.pages.push_back(BytePage {
61                    inner: StorageType::Storage(mem::replace(
62                        &mut self.current,
63                        StorageVec::sized(self.size),
64                    )),
65                });
66            }
67
68            // add buffer to stack
69            self.pages.push_back(p);
70        }
71    }
72
73    #[inline]
74    /// Appends the given bytes to this page object.
75    ///
76    /// Tries to write the data into the current page first. If there
77    /// is insufficient space, one or more new pages are allocated as
78    /// needed, and the remaining data is copied into them.
79    pub fn extend_from_slice(&mut self, extend: &[u8]) {
80        self.put_slice(extend);
81    }
82
83    #[inline]
84    pub fn len(&self) -> usize {
85        self.pages
86            .iter()
87            .fold(self.current.len(), |c, page| c + page.len())
88    }
89
90    #[inline]
91    pub fn is_empty(&self) -> bool {
92        self.len() == 0
93    }
94
95    #[inline]
96    /// Returns the total number of pages contained in this object.
97    pub fn num_pages(&self) -> usize {
98        if self.current.len() == 0 {
99            self.pages.len()
100        } else {
101            self.pages.len() + 1
102        }
103    }
104
105    pub fn take(&mut self) -> Option<BytePage> {
106        if let Some(page) = self.pages.pop_front() {
107            Some(page)
108        } else if self.current.len() == 0 {
109            None
110        } else {
111            Some(BytePage::from(mem::replace(
112                &mut self.current,
113                StorageVec::sized(self.size),
114            )))
115        }
116    }
117
118    pub fn move_to(&mut self, pages: &mut BytePages) {
119        while let Some(page) = self.take() {
120            pages.append(page);
121        }
122    }
123}
124
125impl fmt::Debug for BytePages {
126    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
127        let mut f = fmt.debug_tuple("BytePages");
128        for p in &self.pages {
129            f.field(p);
130        }
131        if self.current.len() != 0 {
132            f.field(&crate::debug::BsDebug(self.current.as_ref()));
133        }
134        f.finish()
135    }
136}
137
138impl Default for BytePages {
139    fn default() -> Self {
140        BytePages::new(BytePageSize::Size16)
141    }
142}
143
144impl BufMut for BytePages {
145    #[inline]
146    fn remaining_mut(&self) -> usize {
147        self.current.remaining()
148    }
149
150    #[inline]
151    unsafe fn advance_mut(&mut self, cnt: usize) {
152        // This call will panic if `cnt` is too big
153        self.current.set_len(self.current.len() + cnt);
154    }
155
156    #[inline]
157    fn chunk_mut(&mut self) -> &mut UninitSlice {
158        unsafe {
159            // This will never panic as `len` can never become invalid
160            let ptr = &mut self.current.as_ptr();
161            UninitSlice::from_raw_parts_mut(
162                ptr.add(self.current.len()),
163                self.remaining_mut(),
164            )
165        }
166    }
167
168    fn put_slice(&mut self, mut src: &[u8]) {
169        while !src.is_empty() {
170            let amount = cmp::min(src.len(), self.current.remaining());
171            unsafe {
172                ptr::copy_nonoverlapping(
173                    src.as_ptr(),
174                    self.chunk_mut().as_mut_ptr(),
175                    amount,
176                );
177                self.advance_mut(amount);
178            }
179            src = &src[amount..];
180
181            // add new page
182            if self.current.is_full() {
183                self.pages.push_back(BytePage::from(mem::replace(
184                    &mut self.current,
185                    StorageVec::sized(self.size),
186                )));
187            }
188        }
189    }
190
191    #[inline]
192    fn put_u8(&mut self, n: u8) {
193        self.current.put_u8(n);
194        if self.current.is_full() {
195            self.pages.push_back(BytePage::from(mem::replace(
196                &mut self.current,
197                StorageVec::sized(self.size),
198            )));
199        }
200    }
201
202    #[inline]
203    fn put_i8(&mut self, n: i8) {
204        self.put_u8(n as u8);
205    }
206}
207
208impl io::Write for BytePages {
209    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
210        self.put_slice(src);
211        Ok(src.len())
212    }
213
214    fn flush(&mut self) -> Result<(), io::Error> {
215        Ok(())
216    }
217}
218
219impl From<BytePages> for Bytes {
220    fn from(pages: BytePages) -> Bytes {
221        BytesMut::from(pages).freeze()
222    }
223}
224
225impl From<BytePages> for BytesMut {
226    fn from(mut pages: BytePages) -> BytesMut {
227        let mut buf = BytesMut::with_capacity(pages.len());
228        while let Some(p) = pages.take() {
229            buf.extend_from_slice(&p);
230        }
231        buf
232    }
233}
234
235pub struct BytePage {
236    inner: StorageType,
237}
238
239enum StorageType {
240    Bytes(Bytes),
241    Storage(StorageVec),
242}
243
244impl BytePage {
245    #[inline]
246    /// Returns the number of bytes contained in this `BytePage`.
247    pub fn len(&self) -> usize {
248        match &self.inner {
249            StorageType::Bytes(b) => b.len(),
250            StorageType::Storage(b) => b.len(),
251        }
252    }
253
254    #[inline]
255    /// Returns true if the `BytePage` has a length of 0.
256    pub fn is_empty(&self) -> bool {
257        match &self.inner {
258            StorageType::Bytes(b) => b.is_empty(),
259            StorageType::Storage(b) => b.len() == 0,
260        }
261    }
262
263    /// Return a raw pointer to data.
264    pub fn as_ptr(&self) -> *const u8 {
265        unsafe {
266            match &self.inner {
267                StorageType::Bytes(b) => b.storage.as_ptr(),
268                StorageType::Storage(b) => b.as_ptr(),
269            }
270        }
271    }
272
273    /// Advance the internal cursor.
274    ///
275    /// Afterwards `self` contains elements `[cnt, len)`.
276    /// This is an `O(1)` operation.
277    ///
278    /// # Panics
279    ///
280    /// Panics if `cnt > len`.
281    #[inline]
282    pub fn advance_to(&mut self, cnt: usize) {
283        match &mut self.inner {
284            StorageType::Bytes(b) => b.advance_to(cnt),
285            StorageType::Storage(b) => unsafe { b.set_start(cnt as u32) },
286        }
287    }
288
289    /// Converts `self` into an immutable `Bytes`.
290    #[inline]
291    #[must_use]
292    pub fn freeze(self) -> Bytes {
293        match self.inner {
294            StorageType::Bytes(b) => b,
295            StorageType::Storage(st) => Bytes {
296                storage: st.freeze(),
297            },
298        }
299    }
300}
301
302impl AsRef<[u8]> for BytePage {
303    #[inline]
304    fn as_ref(&self) -> &[u8] {
305        match &self.inner {
306            StorageType::Bytes(b) => b.as_ref(),
307            StorageType::Storage(b) => b.as_ref(),
308        }
309    }
310}
311
312impl Borrow<[u8]> for BytePage {
313    #[inline]
314    fn borrow(&self) -> &[u8] {
315        self.as_ref()
316    }
317}
318
319impl From<Bytes> for BytePage {
320    fn from(buf: Bytes) -> Self {
321        BytePage {
322            inner: StorageType::Bytes(buf),
323        }
324    }
325}
326
327impl From<BytesMut> for BytePage {
328    fn from(buf: BytesMut) -> Self {
329        BytePage {
330            inner: StorageType::Storage(buf.storage),
331        }
332    }
333}
334
335impl From<ByteString> for BytePage {
336    fn from(s: ByteString) -> Self {
337        s.into_bytes().into()
338    }
339}
340
341impl From<StorageVec> for BytePage {
342    fn from(buf: StorageVec) -> Self {
343        BytePage {
344            inner: StorageType::Storage(buf),
345        }
346    }
347}
348
349impl From<BytePage> for BytesMut {
350    fn from(page: BytePage) -> Self {
351        match page.inner {
352            StorageType::Bytes(b) => b.into(),
353            StorageType::Storage(storage) => BytesMut { storage },
354        }
355    }
356}
357
358impl io::Read for BytePage {
359    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
360        let len = cmp::min(self.len(), dst.len());
361        if len > 0 {
362            dst[..len].copy_from_slice(&self[..len]);
363            self.advance_to(len);
364        }
365        Ok(len)
366    }
367}
368
369impl ops::Deref for BytePage {
370    type Target = [u8];
371
372    #[inline]
373    fn deref(&self) -> &[u8] {
374        self.as_ref()
375    }
376}
377
378impl fmt::Debug for BytePage {
379    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
380        fmt::Debug::fmt(&crate::debug::BsDebug(self.as_ref()), fmt)
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn pages() {
390        // pages
391        let mut pages = BytePages::new(BytePageSize::Size8);
392        assert!(pages.is_empty());
393        assert_eq!(pages.len(), 0);
394        assert_eq!(pages.num_pages(), 0);
395        pages.extend_from_slice(b"b");
396        assert_eq!(pages.len(), 1);
397        assert_eq!(pages.num_pages(), 1);
398        pages.extend_from_slice("a".repeat(9 * 1024).as_bytes());
399        assert_eq!(pages.len(), 9217);
400        assert_eq!(pages.num_pages(), 2);
401        assert!(!pages.is_empty());
402
403        let mut pgs = BytePages::new(BytePageSize::Size8);
404        pgs.put_i8(b'a' as i8);
405        let p = pgs.take().unwrap();
406        assert_eq!(p.len(), 1);
407        assert_eq!(p.as_ref(), b"a");
408
409        pgs.extend_from_slice("a".repeat(8 * 1024 - 1).as_bytes());
410        assert_eq!(pgs.num_pages(), 1);
411        pgs.put_u8(b'a');
412        assert_eq!(pgs.num_pages(), 1);
413        assert_eq!(pgs.current.len(), 0);
414
415        pgs.put_u8(b'a');
416        assert_eq!(pgs.num_pages(), 2);
417
418        pgs.append(Bytes::copy_from_slice("a".repeat(8 * 1024).as_bytes()));
419        assert_eq!(pgs.num_pages(), 3);
420        assert_eq!(pgs.current.len(), 0);
421
422        // page
423        let p = pages.take().unwrap();
424        assert_eq!(p.len(), 8192);
425        let p = pages.take().unwrap();
426        assert_eq!(p.len(), 1025);
427        assert!(!p.is_empty());
428        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
429        assert_eq!(p.as_ref(), "a".repeat(1025).as_bytes());
430        assert!(pages.take().is_none());
431
432        let p = BytePage::from(Bytes::copy_from_slice(b"123"));
433        assert_eq!(p.len(), 3);
434        assert!(!p.is_empty());
435        assert_eq!(p.as_ref(), b"123");
436        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
437
438        // debug
439        let mut pages = BytePages::new(BytePageSize::Size8);
440        pages.extend_from_slice(b"b");
441        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\")");
442        let p = pages.take().unwrap();
443        assert_eq!(p.as_ref(), b"b");
444
445        let mut pages = BytePages::new(BytePageSize::Size8);
446        pages.extend_from_slice(b"a");
447        pages.append(Bytes::copy_from_slice(b"123"));
448        pages.pages.push_back(p);
449        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\", b\"a123\")");
450    }
451
452    #[test]
453    fn page_read() {
454        use std::io::Read;
455
456        let mut page = BytePage::from(Bytes::copy_from_slice(b"123"));
457
458        let mut buf = [0; 10];
459        assert_eq!(page.read(&mut buf).unwrap(), 3);
460        assert_eq!(page.len(), 0);
461        assert_eq!(buf, [49, 50, 51, 0, 0, 0, 0, 0, 0, 0]);
462    }
463}