Skip to main content

ntex_bytes/
pages.rs

1use std::{borrow::Borrow, cmp, collections::VecDeque, fmt, io, mem, ops, ptr};
2
3use crate::{BufMut, BytePageSize, ByteString, Bytes, BytesMut};
4use crate::{buf::UninitSlice, storage::StorageVec};
5
6pub struct BytePages {
7    size: BytePageSize,
8    pages: VecDeque<BytePage>,
9    current: StorageVec,
10}
11
12impl BytePages {
13    /// Creates a new `BytePages` with the specified page size.
14    ///
15    /// The returned `BytePages` will be hold one page with
16    /// specified capacity.
17    pub fn new(size: BytePageSize) -> Self {
18        debug_assert!(size != BytePageSize::Unset, "Page cannot be Unset");
19
20        BytePages {
21            size,
22            pages: VecDeque::with_capacity(8),
23            current: StorageVec::sized(size),
24        }
25    }
26
27    pub fn page_size(&self) -> BytePageSize {
28        self.size
29    }
30
31    pub fn set_page_size(&mut self, size: BytePageSize) {
32        self.size = size;
33    }
34
35    pub fn prepend<T>(&mut self, buf: T) -> bool
36    where
37        BytePage: From<T>,
38    {
39        let p = BytePage::from(buf);
40        if p.is_empty() {
41            false
42        } else {
43            self.pages.push_front(p);
44            true
45        }
46    }
47
48    pub fn append<T>(&mut self, buf: T)
49    where
50        BytePage: From<T>,
51    {
52        let p = BytePage::from(buf);
53        let remaining = self.current.remaining();
54
55        if p.len() <= remaining {
56            self.put_slice(p.as_ref());
57        } else if self.current.len() == 0 {
58            match p.into_storage() {
59                Ok(st) => {
60                    self.current = st;
61                }
62                Err(page) => {
63                    // add buffer to stack
64                    self.pages.push_back(page);
65                }
66            }
67        } else {
68            // push current storage to stack
69            self.pages.push_back(BytePage {
70                inner: StorageType::Storage(mem::replace(
71                    &mut self.current,
72                    StorageVec::sized(self.size),
73                )),
74            });
75
76            // add buffer to stack
77            self.pages.push_back(p);
78        }
79    }
80
81    #[inline]
82    /// Appends the given bytes to this page object.
83    ///
84    /// Tries to write the data into the current page first. If there
85    /// is insufficient space, one or more new pages are allocated as
86    /// needed, and the remaining data is copied into them.
87    pub fn extend_from_slice(&mut self, extend: &[u8]) {
88        self.put_slice(extend);
89    }
90
91    #[inline]
92    pub fn len(&self) -> usize {
93        self.pages
94            .iter()
95            .fold(self.current.len(), |c, page| c + page.len())
96    }
97
98    #[inline]
99    pub fn is_empty(&self) -> bool {
100        for p in &self.pages {
101            if !p.is_empty() {
102                return false;
103            }
104        }
105        self.current.len() == 0
106    }
107
108    #[inline]
109    /// Returns the total number of pages contained in this object.
110    pub fn num_pages(&self) -> usize {
111        if self.current.len() == 0 {
112            self.pages.len()
113        } else {
114            self.pages.len() + 1
115        }
116    }
117
118    pub fn take(&mut self) -> Option<BytePage> {
119        if let Some(page) = self.pages.pop_front() {
120            Some(page)
121        } else if self.current.len() == 0 {
122            None
123        } else {
124            Some(BytePage::from(mem::replace(
125                &mut self.current,
126                StorageVec::sized(self.size),
127            )))
128        }
129    }
130
131    #[inline]
132    pub fn move_to(&mut self, pages: &mut BytePages) {
133        while let Some(page) = self.take() {
134            pages.append(page);
135        }
136    }
137
138    #[inline]
139    pub fn try_get_current_from(&mut self, pages: &mut BytePages) {
140        if self.pages.is_empty() && self.current.len() == 0 && pages.current.len() != 0 {
141            self.current = mem::replace(&mut pages.current, StorageVec::sized(self.size));
142        }
143    }
144}
145
146impl fmt::Debug for BytePages {
147    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
148        let mut f = fmt.debug_tuple("BytePages");
149        for p in &self.pages {
150            f.field(p);
151        }
152        if self.current.len() != 0 {
153            f.field(&crate::debug::BsDebug(self.current.as_ref()));
154        }
155        f.finish()
156    }
157}
158
159impl Default for BytePages {
160    fn default() -> Self {
161        BytePages::new(BytePageSize::Size16)
162    }
163}
164
165impl BufMut for BytePages {
166    #[inline]
167    fn remaining_mut(&self) -> usize {
168        self.current.remaining()
169    }
170
171    #[inline]
172    unsafe fn advance_mut(&mut self, cnt: usize) {
173        // This call will panic if `cnt` is too big
174        self.current.set_len(self.current.len() + cnt);
175    }
176
177    #[inline]
178    fn chunk_mut(&mut self) -> &mut UninitSlice {
179        unsafe {
180            // This will never panic as `len` can never become invalid
181            let ptr = &mut self.current.as_ptr();
182            UninitSlice::from_raw_parts_mut(
183                ptr.add(self.current.len()),
184                self.remaining_mut(),
185            )
186        }
187    }
188
189    fn put_slice(&mut self, mut src: &[u8]) {
190        while !src.is_empty() {
191            let amount = cmp::min(src.len(), self.current.remaining());
192            unsafe {
193                ptr::copy_nonoverlapping(
194                    src.as_ptr(),
195                    self.chunk_mut().as_mut_ptr(),
196                    amount,
197                );
198                self.advance_mut(amount);
199            }
200            src = &src[amount..];
201
202            // add new page
203            if self.current.is_full() {
204                self.pages.push_back(BytePage::from(mem::replace(
205                    &mut self.current,
206                    StorageVec::sized(self.size),
207                )));
208            }
209        }
210    }
211
212    #[inline]
213    fn put_u8(&mut self, n: u8) {
214        self.current.put_u8(n);
215        if self.current.is_full() {
216            self.pages.push_back(BytePage::from(mem::replace(
217                &mut self.current,
218                StorageVec::sized(self.size),
219            )));
220        }
221    }
222
223    #[inline]
224    fn put_i8(&mut self, n: i8) {
225        self.put_u8(n as u8);
226    }
227}
228
229impl io::Write for BytePages {
230    fn write(&mut self, src: &[u8]) -> Result<usize, io::Error> {
231        self.put_slice(src);
232        Ok(src.len())
233    }
234
235    fn flush(&mut self) -> Result<(), io::Error> {
236        Ok(())
237    }
238}
239
240impl From<BytePages> for Bytes {
241    fn from(pages: BytePages) -> Bytes {
242        BytesMut::from(pages).freeze()
243    }
244}
245
246impl From<BytePages> for BytesMut {
247    fn from(mut pages: BytePages) -> BytesMut {
248        let mut buf = BytesMut::with_capacity(pages.len());
249        while let Some(p) = pages.take() {
250            buf.extend_from_slice(&p);
251        }
252        buf
253    }
254}
255
256pub struct BytePage {
257    inner: StorageType,
258}
259
260enum StorageType {
261    Bytes(Bytes),
262    Storage(StorageVec),
263    Vec(Vec<u8>),
264}
265
266impl BytePage {
267    #[inline]
268    /// Returns the number of bytes contained in this `BytePage`.
269    pub fn len(&self) -> usize {
270        match &self.inner {
271            StorageType::Bytes(b) => b.len(),
272            StorageType::Storage(b) => b.len(),
273            StorageType::Vec(b) => b.len(),
274        }
275    }
276
277    #[inline]
278    /// Returns true if the `BytePage` has a length of 0.
279    pub fn is_empty(&self) -> bool {
280        match &self.inner {
281            StorageType::Bytes(b) => b.is_empty(),
282            StorageType::Storage(b) => b.len() == 0,
283            StorageType::Vec(b) => b.is_empty(),
284        }
285    }
286
287    /// Return a raw pointer to data.
288    pub fn as_ptr(&self) -> *const u8 {
289        unsafe {
290            match &self.inner {
291                StorageType::Bytes(b) => b.storage.as_ptr(),
292                StorageType::Storage(b) => b.as_ptr(),
293                StorageType::Vec(b) => b.as_ptr(),
294            }
295        }
296    }
297
298    /// Advance the internal cursor.
299    ///
300    /// Afterwards `self` contains elements `[cnt, len)`.
301    /// This is an `O(1)` operation.
302    ///
303    /// # Panics
304    ///
305    /// Panics if `cnt > len`.
306    #[inline]
307    pub fn advance_to(&mut self, cnt: usize) {
308        match &mut self.inner {
309            StorageType::Bytes(b) => b.advance_to(cnt),
310            StorageType::Storage(b) => unsafe { b.set_start(cnt as u32) },
311            StorageType::Vec(b) => {
312                self.inner = StorageType::Bytes(Bytes::copy_from_slice(&b[cnt..]));
313            }
314        }
315    }
316
317    /// Converts `self` into an immutable `Bytes`.
318    #[inline]
319    #[must_use]
320    pub fn freeze(self) -> Bytes {
321        match self.inner {
322            StorageType::Bytes(b) => b,
323            StorageType::Storage(st) => Bytes {
324                storage: st.freeze(),
325            },
326            StorageType::Vec(v) => Bytes::from(v),
327        }
328    }
329
330    fn into_storage(self) -> Result<StorageVec, Self> {
331        if let StorageType::Storage(st) = self.inner {
332            if st.remaining() > 0 {
333                Ok(st)
334            } else {
335                Err(Self {
336                    inner: StorageType::Storage(st),
337                })
338            }
339        } else {
340            Err(self)
341        }
342    }
343}
344
345impl AsRef<[u8]> for BytePage {
346    #[inline]
347    fn as_ref(&self) -> &[u8] {
348        match &self.inner {
349            StorageType::Bytes(b) => b.as_ref(),
350            StorageType::Storage(b) => b.as_ref(),
351            StorageType::Vec(b) => b.as_ref(),
352        }
353    }
354}
355
356impl Borrow<[u8]> for BytePage {
357    #[inline]
358    fn borrow(&self) -> &[u8] {
359        self.as_ref()
360    }
361}
362
363impl From<Bytes> for BytePage {
364    fn from(buf: Bytes) -> Self {
365        BytePage {
366            inner: StorageType::Bytes(buf),
367        }
368    }
369}
370
371impl From<BytesMut> for BytePage {
372    fn from(buf: BytesMut) -> Self {
373        BytePage {
374            inner: StorageType::Storage(buf.storage),
375        }
376    }
377}
378
379impl From<ByteString> for BytePage {
380    fn from(s: ByteString) -> Self {
381        s.into_bytes().into()
382    }
383}
384
385impl From<StorageVec> for BytePage {
386    fn from(buf: StorageVec) -> Self {
387        BytePage {
388            inner: StorageType::Storage(buf),
389        }
390    }
391}
392
393impl From<Vec<u8>> for BytePage {
394    fn from(buf: Vec<u8>) -> Self {
395        BytePage {
396            inner: StorageType::Vec(buf),
397        }
398    }
399}
400
401impl From<&'static str> for BytePage {
402    fn from(buf: &'static str) -> Self {
403        BytePage::from(Bytes::from_static(buf.as_bytes()))
404    }
405}
406
407impl From<&'static [u8]> for BytePage {
408    fn from(buf: &'static [u8]) -> Self {
409        BytePage::from(Bytes::from_static(buf))
410    }
411}
412
413impl<const N: usize> From<&'static [u8; N]> for BytePage {
414    fn from(src: &'static [u8; N]) -> Self {
415        BytePage::from(Bytes::from_static(src))
416    }
417}
418
419impl From<BytePage> for BytesMut {
420    fn from(page: BytePage) -> Self {
421        match page.inner {
422            StorageType::Bytes(b) => b.into(),
423            StorageType::Storage(storage) => BytesMut { storage },
424            StorageType::Vec(v) => BytesMut::copy_from_slice(&v),
425        }
426    }
427}
428
429impl io::Read for BytePage {
430    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
431        let len = cmp::min(self.len(), dst.len());
432        if len > 0 {
433            dst[..len].copy_from_slice(&self[..len]);
434            self.advance_to(len);
435        }
436        Ok(len)
437    }
438}
439
440impl ops::Deref for BytePage {
441    type Target = [u8];
442
443    #[inline]
444    fn deref(&self) -> &[u8] {
445        self.as_ref()
446    }
447}
448
449impl fmt::Debug for BytePage {
450    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
451        fmt::Debug::fmt(&crate::debug::BsDebug(self.as_ref()), fmt)
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    #[test]
460    fn pages() {
461        // pages
462        let mut pages = BytePages::new(BytePageSize::Size8);
463        assert!(pages.is_empty());
464        assert_eq!(pages.len(), 0);
465        assert_eq!(pages.num_pages(), 0);
466        pages.extend_from_slice(b"b");
467        assert_eq!(pages.len(), 1);
468        assert_eq!(pages.num_pages(), 1);
469        pages.extend_from_slice("a".repeat(9 * 1024).as_bytes());
470        assert_eq!(pages.len(), 9217);
471        assert_eq!(pages.num_pages(), 2);
472        assert!(!pages.is_empty());
473
474        let mut pgs = BytePages::new(BytePageSize::Size8);
475        pgs.put_i8(b'a' as i8);
476        let p = pgs.take().unwrap();
477        assert_eq!(p.len(), 1);
478        assert_eq!(p.as_ref(), b"a");
479
480        pgs.extend_from_slice("a".repeat(8 * 1024 - 1).as_bytes());
481        assert_eq!(pgs.num_pages(), 1);
482        pgs.put_u8(b'a');
483        assert_eq!(pgs.num_pages(), 1);
484        assert_eq!(pgs.current.len(), 0);
485
486        pgs.put_u8(b'a');
487        assert_eq!(pgs.num_pages(), 2);
488
489        pgs.append(Bytes::copy_from_slice("a".repeat(8 * 1024).as_bytes()));
490        assert_eq!(pgs.num_pages(), 3);
491        assert_eq!(pgs.current.len(), 0);
492
493        // page
494        let p = pages.take().unwrap();
495        assert_eq!(p.len(), 8192);
496        let p = pages.take().unwrap();
497        assert_eq!(p.len(), 1025);
498        assert!(!p.is_empty());
499        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
500        assert_eq!(p.as_ref(), "a".repeat(1025).as_bytes());
501        assert!(pages.take().is_none());
502
503        let p = BytePage::from(Bytes::copy_from_slice(b"123"));
504        assert_eq!(p.len(), 3);
505        assert!(!p.is_empty());
506        assert_eq!(p.as_ref(), b"123");
507        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
508
509        let p = BytePage::from(&b"123"[..]);
510        assert_eq!(p.len(), 3);
511        assert!(!p.is_empty());
512        assert_eq!(p.as_ref(), b"123");
513        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
514
515        let p = BytePage::from(b"123");
516        assert_eq!(p.len(), 3);
517        assert!(!p.is_empty());
518        assert_eq!(p.as_ref(), b"123");
519        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
520
521        let p = BytePage::from("123");
522        assert_eq!(p.len(), 3);
523        assert!(!p.is_empty());
524        assert_eq!(p.as_ref(), b"123");
525        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
526        assert_eq!(p.freeze(), b"123");
527
528        let p = BytePage::from(vec![b'1', b'2', b'3']);
529        assert_eq!(p.len(), 3);
530        assert!(!p.is_empty());
531        assert_eq!(p.as_ref(), b"123");
532        assert_eq!(p.as_ref().as_ptr(), p.as_ptr());
533        assert_eq!(p.freeze(), b"123");
534
535        let mut p = BytePage::from(vec![b'1', b'2', b'3']);
536        p.advance_to(1);
537        assert_eq!(p.len(), 2);
538        assert!(!p.is_empty());
539        assert_eq!(p.as_ref(), b"23");
540
541        // debug
542        let mut pages = BytePages::new(BytePageSize::Size8);
543        pages.extend_from_slice(b"b");
544        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\")");
545        let p = pages.take().unwrap();
546        assert_eq!(p.as_ref(), b"b");
547
548        let mut pages = BytePages::new(BytePageSize::Size8);
549        pages.extend_from_slice(b"a");
550        pages.append(Bytes::copy_from_slice(b"123"));
551        pages.pages.push_back(p);
552        assert_eq!(format!("{pages:?}"), "BytePages(b\"b\", b\"a123\")");
553    }
554
555    #[test]
556    fn page_read() {
557        use std::io::Read;
558
559        let mut page = BytePage::from(Bytes::copy_from_slice(b"123"));
560
561        let mut buf = [0; 10];
562        assert_eq!(page.read(&mut buf).unwrap(), 3);
563        assert_eq!(page.len(), 0);
564        assert_eq!(buf, [49, 50, 51, 0, 0, 0, 0, 0, 0, 0]);
565    }
566}