Skip to main content

ntex_bytes/
pages.rs

1use std::{borrow::Borrow, cmp, collections::VecDeque, fmt, io, mem, ops, ptr};
2
3use crate::{BufMut, BytePageSize, ByteString, Bytes, BytesMut};
4use crate::{buf::UninitSlice, storage::StorageVec};
5
6pub struct BytePages {
7    size: BytePageSize,
8    pages: VecDeque<BytePage>,
9    current: StorageVec,
10}
11
12impl BytePages {
13    /// Creates a new `BytePages` with the specified page size.
14    ///
15    /// The returned `BytePages` will be hold one page with
16    /// specified capacity.
17    pub fn new(size: BytePageSize) -> Self {
18        debug_assert!(size != BytePageSize::Unset, "Page cannot be Unset");
19
20        BytePages {
21            size,
22            pages: VecDeque::with_capacity(8),
23            current: StorageVec::sized(size),
24        }
25    }
26
27    /// Get size of the page.
28    pub fn page_size(&self) -> BytePageSize {
29        self.size
30    }
31
32    /// Sets the page size for new pages.
33    pub fn set_page_size(&mut self, size: BytePageSize) {
34        self.size = size;
35    }
36
37    /// Insert a page to the front of the collection.
38    pub fn prepend<T>(&mut self, buf: T) -> bool
39    where
40        BytePage: From<T>,
41    {
42        let p = BytePage::from(buf);
43        if p.is_empty() {
44            false
45        } else {
46            self.pages.push_front(p);
47            true
48        }
49    }
50
51    /// Appends a new page to the back of the collection.
52    pub fn append<T>(&mut self, buf: T)
53    where
54        BytePage: From<T>,
55    {
56        let p = BytePage::from(buf);
57        let remaining = self.current.remaining();
58
59        if p.len() <= remaining {
60            self.put_slice(p.as_ref());
61        } else if self.current.len() == 0 {
62            match p.into_storage() {
63                Ok(st) => {
64                    self.current = st;
65                }
66                Err(page) => {
67                    // add buffer to stack
68                    self.pages.push_back(page);
69                }
70            }
71        } else {
72            // push current storage to stack
73            self.pages.push_back(BytePage {
74                inner: StorageType::Storage(mem::replace(
75                    &mut self.current,
76                    StorageVec::sized(self.size),
77                )),
78            });
79
80            // add buffer to stack
81            self.pages.push_back(p);
82        }
83    }
84
85    #[inline]
86    /// Appends the given bytes to this page object.
87    ///
88    /// Tries to write the data into the current page first. If there
89    /// is insufficient space, one or more new pages are allocated as
90    /// needed, and the remaining data is copied into them.
91    pub fn extend_from_slice(&mut self, extend: &[u8]) {
92        self.put_slice(extend);
93    }
94
95    #[inline]
96    /// Gets the total number of pages.
97    pub fn len(&self) -> usize {
98        self.pages
99            .iter()
100            .fold(self.current.len(), |c, page| c + page.len())
101    }
102
103    #[inline]
104    /// Checks if the `BytePages` instance is empty.
105    pub fn is_empty(&self) -> bool {
106        for p in &self.pages {
107            if !p.is_empty() {
108                return false;
109            }
110        }
111        self.current.len() == 0
112    }
113
114    #[inline]
115    /// Returns the total number of pages contained in this object.
116    pub fn num_pages(&self) -> usize {
117        if self.current.len() == 0 {
118            self.pages.len()
119        } else {
120            self.pages.len() + 1
121        }
122    }
123
124    /// Returns the first page from the collection.
125    pub fn take(&mut self) -> Option<BytePage> {
126        if let Some(page) = self.pages.pop_front() {
127            Some(page)
128        } else if self.current.len() == 0 {
129            None
130        } else {
131            Some(BytePage::from(mem::replace(
132                &mut self.current,
133                StorageVec::sized(self.size),
134            )))
135        }
136    }
137
138    #[inline]
139    /// Moves all pages to another `BytePages` instance.
140    pub fn move_to(&mut self, pages: &mut BytePages) {
141        while let Some(page) = self.take() {
142            pages.append(page);
143        }
144    }
145
146    #[inline]
147    pub fn try_get_current_from(&mut self, pages: &mut BytePages) {
148        if self.pages.is_empty() && self.current.len() == 0 && pages.current.len() != 0 {
149            self.current = mem::replace(&mut pages.current, StorageVec::sized(self.size));
150        }
151    }
152
153    /// Converts `self` into an immutable `Bytes`.
154    #[inline]
155    #[must_use]
156    pub fn freeze(&mut self) -> Bytes {
157        let mut buf = BytesMut::with_capacity(self.len());
158        while let Some(p) = self.take() {
159            buf.extend_from_slice(&p);
160        }
161        buf.freeze()
162    }
163}
164
165impl fmt::Debug for BytePages {
166    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167        let mut f = fmt.debug_tuple("BytePages");
168        for p in &self.pages {
169            f.field(p);
170        }
171        if self.current.len() != 0 {
172            f.field(&crate::debug::BsDebug(self.current.as_ref()));
173        }
174        f.finish()
175    }
176}
177
178impl Default for BytePages {
179    fn default() -> Self {
180        BytePages::new(BytePageSize::Size16)
181    }
182}
183
184impl BufMut for BytePages {
185    #[inline]
186    fn remaining_mut(&self) -> usize {
187        self.current.remaining()
188    }
189
190    #[inline]
191    unsafe fn advance_mut(&mut self, cnt: usize) {
192        // This call will panic if `cnt` is too big
193        self.current.set_len(self.current.len() + cnt);
194    }
195
196    #[inline]
197    fn chunk_mut(&mut self) -> &mut UninitSlice {
198        unsafe {
199            // This will never panic as `len` can never become invalid
200            let ptr = &mut self.current.as_ptr();
201            UninitSlice::from_raw_parts_mut(
202                ptr.add(self.current.len()),
203                self.remaining_mut(),
204            )
205        }
206    }
207
208    fn put_slice(&mut self, mut src: &[u8]) {
209        while !src.is_empty() {
210            let amount = cmp::min(src.len(), self.current.remaining());
211            unsafe {
212                ptr::copy_nonoverlapping(
213                    src.as_ptr(),
214                    self.chunk_mut().as_mut_ptr(),
215                    amount,
216                );
217                self.advance_mut(amount);
218            }
219            src = &src[amount..];
220
221            // add new page
222            if self.current.is_full() {
223                self.pages.push_back(BytePage::from(mem::replace(
224                    &mut self.current,
225                    StorageVec::sized(self.size),
226                )));
227            }
228        }
229    }
230
231    #[inline]
232    fn put_u8(&mut self, n: u8) {
233        self.current.put_u8(n);
234        if self.current.is_full() {
235            self.pages.push_back(BytePage::from(mem::replace(
236                &mut self.current,
237                StorageVec::sized(self.size),
238            )));
239        }
240    }
241
242    #[inline]
243    fn put_i8(&mut self, n: i8) {
244        self.put_u8(n as u8);
245    }
246}
247
248impl io::Write for BytePages {
249    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
250        self.put_slice(src);
251        Ok(src.len())
252    }
253
254    fn flush(&mut self) -> Result<(), io::Error> {
255        Ok(())
256    }
257}
258
259impl From<BytePages> for Bytes {
260    fn from(pages: BytePages) -> Bytes {
261        BytesMut::from(pages).freeze()
262    }
263}
264
265impl From<BytePages> for BytesMut {
266    fn from(mut pages: BytePages) -> BytesMut {
267        let mut buf = BytesMut::with_capacity(pages.len());
268        while let Some(p) = pages.take() {
269            buf.extend_from_slice(&p);
270        }
271        buf
272    }
273}
274
275pub struct BytePage {
276    inner: StorageType,
277}
278
279enum StorageType {
280    Bytes(Bytes),
281    Storage(StorageVec),
282    Vec(Vec<u8>),
283}
284
285impl BytePage {
286    #[inline]
287    /// Returns the number of bytes contained in this `BytePage`.
288    pub fn len(&self) -> usize {
289        match &self.inner {
290            StorageType::Bytes(b) => b.len(),
291            StorageType::Storage(b) => b.len(),
292            StorageType::Vec(b) => b.len(),
293        }
294    }
295
296    #[inline]
297    /// Returns true if the `BytePage` has a length of 0.
298    pub fn is_empty(&self) -> bool {
299        match &self.inner {
300            StorageType::Bytes(b) => b.is_empty(),
301            StorageType::Storage(b) => b.len() == 0,
302            StorageType::Vec(b) => b.is_empty(),
303        }
304    }
305
306    /// Return a raw pointer to data.
307    pub fn as_ptr(&self) -> *const u8 {
308        unsafe {
309            match &self.inner {
310                StorageType::Bytes(b) => b.storage.as_ptr(),
311                StorageType::Storage(b) => b.as_ptr(),
312                StorageType::Vec(b) => b.as_ptr(),
313            }
314        }
315    }
316
317    /// Advance the internal cursor.
318    ///
319    /// Afterwards `self` contains elements `[cnt, len)`.
320    /// This is an `O(1)` operation.
321    ///
322    /// # Panics
323    ///
324    /// Panics if `cnt > len`.
325    #[inline]
326    pub fn advance_to(&mut self, cnt: usize) {
327        match &mut self.inner {
328            StorageType::Bytes(b) => b.advance_to(cnt),
329            StorageType::Storage(b) => unsafe { b.set_start(cnt as u32) },
330            StorageType::Vec(b) => {
331                self.inner = StorageType::Bytes(Bytes::copy_from_slice(&b[cnt..]));
332            }
333        }
334    }
335
336    /// Converts `self` into an immutable `Bytes`.
337    #[inline]
338    #[must_use]
339    pub fn freeze(self) -> Bytes {
340        match self.inner {
341            StorageType::Bytes(b) => b,
342            StorageType::Storage(st) => Bytes {
343                storage: st.freeze(),
344            },
345            StorageType::Vec(v) => Bytes::from(v),
346        }
347    }
348
349    fn into_storage(self) -> Result<StorageVec, Self> {
350        if let StorageType::Storage(st) = self.inner {
351            if st.remaining() > 0 {
352                Ok(st)
353            } else {
354                Err(Self {
355                    inner: StorageType::Storage(st),
356                })
357            }
358        } else {
359            Err(self)
360        }
361    }
362}
363
364impl AsRef<[u8]> for BytePage {
365    #[inline]
366    fn as_ref(&self) -> &[u8] {
367        match &self.inner {
368            StorageType::Bytes(b) => b.as_ref(),
369            StorageType::Storage(b) => b.as_ref(),
370            StorageType::Vec(b) => b.as_ref(),
371        }
372    }
373}
374
375impl Borrow<[u8]> for BytePage {
376    #[inline]
377    fn borrow(&self) -> &[u8] {
378        self.as_ref()
379    }
380}
381
382impl From<Bytes> for BytePage {
383    fn from(buf: Bytes) -> Self {
384        BytePage {
385            inner: StorageType::Bytes(buf),
386        }
387    }
388}
389
390impl From<BytesMut> for BytePage {
391    fn from(buf: BytesMut) -> Self {
392        BytePage {
393            inner: StorageType::Storage(buf.storage),
394        }
395    }
396}
397
398impl From<ByteString> for BytePage {
399    fn from(s: ByteString) -> Self {
400        s.into_bytes().into()
401    }
402}
403
404impl From<StorageVec> for BytePage {
405    fn from(buf: StorageVec) -> Self {
406        BytePage {
407            inner: StorageType::Storage(buf),
408        }
409    }
410}
411
412impl From<Vec<u8>> for BytePage {
413    fn from(buf: Vec<u8>) -> Self {
414        BytePage {
415            inner: StorageType::Vec(buf),
416        }
417    }
418}
419
420impl From<&'static str> for BytePage {
421    fn from(buf: &'static str) -> Self {
422        BytePage::from(Bytes::from_static(buf.as_bytes()))
423    }
424}
425
426impl From<&'static [u8]> for BytePage {
427    fn from(buf: &'static [u8]) -> Self {
428        BytePage::from(Bytes::from_static(buf))
429    }
430}
431
432impl<const N: usize> From<&'static [u8; N]> for BytePage {
433    fn from(src: &'static [u8; N]) -> Self {
434        BytePage::from(Bytes::from_static(src))
435    }
436}
437
438impl From<BytePage> for Bytes {
439    fn from(page: BytePage) -> Self {
440        match page.inner {
441            StorageType::Bytes(b) => b,
442            StorageType::Storage(storage) => BytesMut { storage }.freeze(),
443            StorageType::Vec(v) => Bytes::copy_from_slice(&v),
444        }
445    }
446}
447
448impl From<BytePage> for BytesMut {
449    fn from(page: BytePage) -> Self {
450        match page.inner {
451            StorageType::Bytes(b) => b.into(),
452            StorageType::Storage(storage) => BytesMut { storage },
453            StorageType::Vec(v) => BytesMut::copy_from_slice(&v),
454        }
455    }
456}
457
458impl io::Read for BytePage {
459    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
460        let len = cmp::min(self.len(), dst.len());
461        if len > 0 {
462            dst[..len].copy_from_slice(&self[..len]);
463            self.advance_to(len);
464        }
465        Ok(len)
466    }
467}
468
469impl ops::Deref for BytePage {
470    type Target = [u8];
471
472    #[inline]
473    fn deref(&self) -> &[u8] {
474        self.as_ref()
475    }
476}
477
478impl fmt::Debug for BytePage {
479    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
480        fmt::Debug::fmt(&crate::debug::BsDebug(self.as_ref()), fmt)
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    #[test]
489    fn pages() {
490        // pages
491        let mut pages = BytePages::new(BytePageSize::Size8);
492        assert!(pages.is_empty());
493        assert_eq!(pages.len(), 0);
494        assert_eq!(pages.num_pages(), 0);
495        pages.extend_from_slice(b"b");
496        assert_eq!(pages.len(), 1);
497        assert_eq!(pages.num_pages(), 1);
498        pages.extend_from_slice("a".repeat(9 * 1024).as_bytes());
499        assert_eq!(pages.len(), 9217);
500        assert_eq!(pages.num_pages(), 2);
501        assert!(!pages.is_empty());
502
503        let mut pgs = BytePages::new(BytePageSize::Size8);
504        pgs.put_i8(b'a' as i8);
505        let p = pgs.take().unwrap();
506        assert_eq!(p.len(), 1);
507        assert_eq!(p.as_ref(), b"a");
508
509        pgs.extend_from_slice("a".repeat(8 * 1024 - 1).as_bytes());
510        assert_eq!(pgs.num_pages(), 1);
511        pgs.put_u8(b'a');
512        assert_eq!(pgs.num_pages(), 1);
513        assert_eq!(pgs.current.len(), 0);
514
515        pgs.put_u8(b'a');
516        assert_eq!(pgs.num_pages(), 2);
517
518        pgs.append(Bytes::copy_from_slice("a".repeat(8 * 1024).as_bytes()));
519        assert_eq!(pgs.num_pages(), 3);
520        assert_eq!(pgs.current.len(), 0);
521
522        // page
523        let p = pages.take().unwrap();
524        assert_eq!(p.len(), 8192);
525        let p = pages.take().unwrap();
526        assert_eq!(p.len(), 1025);
527        assert!(!p.is_empty());
528        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
529        assert_eq!(p.as_ref(), "a".repeat(1025).as_bytes());
530        assert!(pages.take().is_none());
531
532        let p = BytePage::from(Bytes::copy_from_slice(b"123"));
533        assert_eq!(p.len(), 3);
534        assert!(!p.is_empty());
535        assert_eq!(p.as_ref(), b"123");
536        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
537
538        let p = BytePage::from(&b"123"[..]);
539        assert_eq!(p.len(), 3);
540        assert!(!p.is_empty());
541        assert_eq!(p.as_ref(), b"123");
542        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
543
544        let p = BytePage::from(b"123");
545        assert_eq!(p.len(), 3);
546        assert!(!p.is_empty());
547        assert_eq!(p.as_ref(), b"123");
548        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
549
550        let p = BytePage::from("123");
551        assert_eq!(p.len(), 3);
552        assert!(!p.is_empty());
553        assert_eq!(p.as_ref(), b"123");
554        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
555        assert_eq!(p.freeze(), b"123");
556
557        let p = BytePage::from(vec![b'1', b'2', b'3']);
558        assert_eq!(p.len(), 3);
559        assert!(!p.is_empty());
560        assert_eq!(p.as_ref(), b"123");
561        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
562        assert_eq!(p.freeze(), b"123");
563
564        let mut p = BytePage::from(vec![b'1', b'2', b'3']);
565        p.advance_to(1);
566        assert_eq!(p.len(), 2);
567        assert!(!p.is_empty());
568        assert_eq!(p.as_ref(), b"23");
569
570        // debug
571        let mut pages = BytePages::new(BytePageSize::Size8);
572        pages.extend_from_slice(b"b");
573        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\")");
574        let p = pages.take().unwrap();
575        assert_eq!(p.as_ref(), b"b");
576
577        let mut pages = BytePages::new(BytePageSize::Size8);
578        pages.extend_from_slice(b"a");
579        pages.append(Bytes::copy_from_slice(b"123"));
580        pages.pages.push_back(p);
581        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\", b\"a123\")");
582    }
583
584    #[test]
585    fn page_read() {
586        use std::io::Read;
587
588        let mut page = BytePage::from(Bytes::copy_from_slice(b"123"));
589
590        let mut buf = [0; 10];
591        assert_eq!(page.read(&mut buf).unwrap(), 3);
592        assert_eq!(page.len(), 0);
593        assert_eq!(buf, [49, 50, 51, 0, 0, 0, 0, 0, 0, 0]);
594    }
595}