buffet/
roll.rs

1use std::{
2    borrow::Cow,
3    cell::UnsafeCell,
4    fmt::{Debug, Formatter},
5    hash::{Hash, Hasher},
6    iter::Enumerate,
7    ops::{Bound, Deref, RangeBounds},
8    rc::Rc,
9    slice,
10    str::Utf8Error,
11};
12
13use crate::{io::ReadOwned, Error, IoBufMut};
14use nom::{
15    Compare, CompareResult, FindSubstring, InputIter, InputLength, InputTake, InputTakeAtPosition,
16    Needed, Slice,
17};
18
19#[macro_export]
20macro_rules! trace {
21    ($($tt:tt)*) => {
22        tracing::trace!($($tt)*)
23    };
24}
25
26use crate::{Buf, BufMut, BUF_SIZE};
27
28type Result<T, E = crate::Error> = std::result::Result<T, E>;
29
30/// A "rolling buffer". Uses either one [BufMut] or a `Box<[u8]>` for storage.
31/// This buffer never grows, but it can be split, and it can be reallocated so
32/// it regains its initical capacity, minus the length of the filled part.
33pub struct RollMut {
34    storage: StorageMut,
35    len: u32,
36}
37
38enum StorageMut {
39    Buf(BufMut),
40    Box(BoxStorage),
41}
42
43impl Debug for StorageMut {
44    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Buf(bm) => f
47                .debug_struct("Buf")
48                .field("index", &bm.index)
49                .field("off", &bm.off)
50                .field("len", &bm.len)
51                .finish(),
52            Self::Box(bs) => f
53                .debug_struct("Box")
54                .field("buf", &bs.buf)
55                .field("off", &bs.off)
56                .finish(),
57        }
58    }
59}
60
61impl StorageMut {
62    #[inline(always)]
63    fn off(&self) -> u32 {
64        match self {
65            StorageMut::Buf(b) => b.off as _,
66            StorageMut::Box(b) => b.off,
67        }
68    }
69
70    #[inline(always)]
71    fn cap(&self) -> usize {
72        match self {
73            StorageMut::Buf(_) => BUF_SIZE as usize,
74            StorageMut::Box(b) => b.cap(),
75        }
76    }
77
78    #[inline(always)]
79    fn len(&self) -> usize {
80        match self {
81            StorageMut::Buf(b) => b.len(),
82            StorageMut::Box(b) => b.len(),
83        }
84    }
85
86    #[inline(always)]
87    unsafe fn as_mut_ptr(&mut self) -> *mut u8 {
88        match self {
89            StorageMut::Buf(b) => b.as_mut_ptr(),
90            StorageMut::Box(b) => b.as_mut_ptr(),
91        }
92    }
93}
94
95#[derive(Clone)]
96struct BoxStorage {
97    buf: Rc<UnsafeCell<Box<[u8]>>>,
98    off: u32,
99}
100
101impl BoxStorage {
102    #[inline(always)]
103    fn len(&self) -> usize {
104        let buf = self.buf.get();
105        let len = unsafe { (*buf).len() };
106        len - self.off as usize
107    }
108
109    #[inline(always)]
110    unsafe fn as_mut_ptr(&self) -> *mut u8 {
111        let buf = self.buf.get();
112        (*buf).as_mut_ptr().byte_offset(self.off as _)
113    }
114
115    /// Returns a slice of bytes into this buffer, of the specified length
116    /// Panics if the length is larger than the buffer.
117    fn slice(&self, len: u32) -> &[u8] {
118        let buf = self.buf.get();
119        unsafe { &(*buf)[self.off as usize..][..len as usize] }
120    }
121
122    /// Returns a mutable slice of bytes into this buffer, of the specified
123    /// length Panics if the length is larger than the buffer.
124    fn slice_mut(&mut self, len: u32) -> &mut [u8] {
125        let buf = self.buf.get();
126        unsafe { &mut (*buf)[self.off as usize..][..len as usize] }
127    }
128
129    fn cap(&self) -> usize {
130        let buf = self.buf.get();
131        unsafe { (*buf).len() }
132    }
133}
134
135impl RollMut {
136    /// Allocate, using a single [BufMut] for storage.
137    pub fn alloc() -> Result<Self> {
138        Ok(Self {
139            storage: StorageMut::Buf(BufMut::alloc()?),
140            len: 0,
141        })
142    }
143
144    /// Double the capacity of this buffer by reallocating it, copying the
145    /// filled part into the new buffer. This method always uses a `Box<[u8]>`
146    /// for storage.
147    ///
148    /// This method is somewhat expensive.
149    pub fn grow(&mut self) {
150        let old_cap = self.storage.cap();
151        let new_cap = old_cap * 2;
152
153        tracing::trace!("growing buffer from {} to {}", old_cap, new_cap);
154
155        // TODO: optimize via `MaybeUninit`?
156        let b = vec![0; new_cap].into_boxed_slice();
157        let mut bs = BoxStorage {
158            buf: Rc::new(UnsafeCell::new(b)),
159            off: 0,
160        };
161        let dst_slice = bs.slice_mut(self.len() as u32);
162        dst_slice.copy_from_slice(&self[..]);
163        let next_storage = StorageMut::Box(bs);
164
165        self.storage = next_storage;
166    }
167
168    /// Reallocates the backing storage for this buffer, copying the filled
169    /// portion into it. Panics if `len() == storage_size()`, in which case
170    /// reallocating won't do much good
171    ///
172    /// Also panics if we're using buf storage and the offset is zero
173    /// (which means reallocating would not free up any space)
174    pub fn compact(&mut self) -> Result<()> {
175        assert!(self.len() != self.storage_size());
176        tracing::trace!("compacting");
177
178        let next_storage = match &self.storage {
179            StorageMut::Buf(bm) => {
180                assert!(bm.off != 0);
181                let mut next_b = BufMut::alloc()?;
182                next_b[..self.len()].copy_from_slice(&self[..]);
183                StorageMut::Buf(next_b)
184            }
185            StorageMut::Box(b) => {
186                tracing::trace!("reallocating, storage is box");
187                if self.len() > BUF_SIZE as usize {
188                    // TODO: optimize via `MaybeUninit`?
189                    let mut next_b = vec![0; b.cap()].into_boxed_slice();
190                    next_b[..self.len()].copy_from_slice(&self[..]);
191                    let next_b = BoxStorage {
192                        buf: Rc::new(UnsafeCell::new(next_b)),
193                        off: 0,
194                    };
195                    StorageMut::Box(next_b)
196                } else {
197                    let mut next_b = BufMut::alloc()?;
198                    next_b[..self.len()].copy_from_slice(&self[..]);
199                    StorageMut::Buf(next_b)
200                }
201            }
202        };
203
204        self.storage = next_storage;
205
206        Ok(())
207    }
208
209    /// Reserve more capacity for this buffer if this buffer is full.
210    /// If this buffer's size matches the underlying storage size,
211    /// this is equivalent to `grow`. Otherwise, it's equivalent
212    /// to `realloc`.
213    pub fn reserve(&mut self) -> Result<()> {
214        if self.len() < self.cap() {
215            tracing::trace!("reserve: len < cap, no need to reserve anything");
216            return Ok(());
217        }
218
219        if self.storage.off() > 0 {
220            // let's try to compact first
221            trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: reallocating");
222            self.compact()?
223        } else {
224            trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: growing");
225            self.grow()
226        }
227
228        Ok(())
229    }
230
231    /// Make sure we can hold "request_len"
232    pub fn reserve_at_least(&mut self, requested_len: usize) -> Result<()> {
233        let cap = self.cap();
234        if requested_len <= cap {
235            trace!(%requested_len, %cap, "reserve_at_least: requested_len <= cap, no need to compact");
236            return Ok(());
237        }
238
239        let len = self.len();
240        if self.storage.off() > 0 && requested_len <= (BUF_SIZE as usize - len) {
241            // we can compact the filled portion!
242            self.compact()?;
243        } else {
244            // we need to allocate box storage of the right size
245            let new_storage_size = std::cmp::max(self.storage_size() * 2, requested_len + len);
246            let mut new_b = vec![0u8; new_storage_size].into_boxed_slice();
247            // copy the filled portion
248            new_b[..self.len()].copy_from_slice(&self[..]);
249            self.storage = StorageMut::Box(BoxStorage {
250                buf: Rc::new(UnsafeCell::new(new_b)),
251                off: 0,
252            });
253        }
254
255        debug_assert!(self.cap() >= requested_len);
256        Ok(())
257    }
258
259    /// The length (filled portion) of this buffer, that can be read
260    #[inline(always)]
261    pub fn len(&self) -> usize {
262        self.len as usize
263    }
264
265    /// Returns true if this is empty
266    #[inline(always)]
267    pub fn is_empty(&self) -> bool {
268        self.len == 0
269    }
270
271    /// The capacity of this buffer, that can be written to
272    #[inline(always)]
273    pub fn cap(&self) -> usize {
274        self.storage.len() - self.len as usize
275    }
276
277    /// The size of the underlying storage (to know whether
278    /// to reallocate or grow)
279    pub fn storage_size(&self) -> usize {
280        self.storage.cap()
281    }
282
283    /// Read at most `limit` bytes from [ReadOwned] into this buffer.
284    ///
285    /// This method takes ownership of `self` because it submits an io_uring
286    /// operation, where the kernel owns the read buffer - the only way to
287    /// gain ownership of `self` again is to complete the read operation.
288    ///
289    /// Panics if `cap` is zero
290    #[inline]
291    pub async fn read_into(
292        self,
293        limit: usize,
294        r: &mut impl ReadOwned,
295    ) -> (std::io::Result<usize>, Self) {
296        let read_cap = std::cmp::min(limit, self.cap());
297        assert!(read_cap > 0, "refusing to do empty read");
298        let read_off = self.len;
299
300        trace!(%read_off, %read_cap, storage = ?self.storage, len = %self.len, "read_into in progress...");
301        let read_into = ReadInto {
302            buf: self,
303            off: read_off,
304            cap: read_cap.try_into().unwrap(),
305        };
306        let (res, mut read_into) = r.read_owned(read_into).await;
307        if let Ok(n) = &res {
308            trace!("read_into got {} bytes", *n);
309            read_into.buf.len += *n as u32;
310        } else {
311            trace!("read_into failed: {:?}", res);
312        }
313        (res, read_into.buf)
314    }
315
316    /// Put a slice into this buffer, fails if the slice doesn't fit in the
317    /// buffer's capacity
318    #[inline]
319    pub fn put(&mut self, s: impl AsRef<[u8]>) -> Result<()> {
320        let s = s.as_ref();
321
322        let len = s.len();
323        if len > self.cap() {
324            return Err(Error::DoesNotFit);
325        }
326        unsafe {
327            let ptr = self.storage.as_mut_ptr().add(self.len as usize);
328            std::ptr::copy_nonoverlapping(s.as_ptr(), ptr, len);
329        }
330        let u32_len: u32 = len.try_into().unwrap();
331        self.len += u32_len;
332        Ok(())
333    }
334
335    /// Put data into this RollMut with a closure. Panics if `len > self.cap()`
336    #[inline]
337    pub fn put_with<T>(&mut self, len: usize, f: impl FnOnce(&mut [u8]) -> Result<T>) -> Result<T> {
338        assert!(len <= self.cap());
339
340        let u32_len: u32 = len.try_into().unwrap();
341        let slice = unsafe {
342            slice::from_raw_parts_mut(self.storage.as_mut_ptr().add(self.len as usize), len)
343        };
344        let res = f(slice);
345        if res.is_ok() {
346            self.len += u32_len;
347        }
348        res
349    }
350
351    /// Assert that this RollMut isn't filled at all, reserve enough size to put
352    /// `len` bytes in it, then fill it with the given closure. If the closure
353    /// returns `Err`, it's as if the put never happened.
354    #[inline]
355    pub fn put_to_roll(
356        &mut self,
357        put_len: usize,
358        f: impl FnOnce(&mut [u8]) -> Result<()>,
359    ) -> Result<Roll> {
360        let put_len_u32: u32 = put_len.try_into().unwrap();
361        self.reserve_at_least(put_len)?;
362        assert_eq!(self.len, 0);
363        let slice = unsafe { slice::from_raw_parts_mut(self.storage.as_mut_ptr(), put_len) };
364        match f(slice) {
365            Ok(()) => {
366                // Safety: `reserve_at_least` ensures that `put_len` is <= self.len
367                let roll = unsafe { self.filled_unchecked_len(put_len_u32) };
368                unsafe { self.storage_skip_unchecked(put_len_u32) };
369                Ok(roll)
370            }
371            Err(e) => Err(e),
372        }
373    }
374
375    /// Get a [Roll] corresponding to the filled portion of this buffer
376    #[inline(always)]
377    pub fn filled(&self) -> Roll {
378        // Safety: the len we're passing if <= self.len because it _is_ self.len
379        unsafe { self.filled_unchecked_len(self.len) }
380    }
381
382    /// # Safety
383    /// `len` must be <= self.len
384    #[inline(always)]
385    unsafe fn filled_unchecked_len(&self, len: u32) -> Roll {
386        match &self.storage {
387            StorageMut::Buf(b) => b.freeze_slice(0..(len as _)).into(),
388            StorageMut::Box(b) => RollBox { b: b.clone(), len }.into(),
389        }
390    }
391
392    /// Advances the buffer by `n` bytes "consuming" filled data.
393    /// Panics if `n > len()`.
394    pub fn skip(&mut self, n: usize) {
395        let u32_n: u32 = n.try_into().unwrap();
396        assert!(u32_n <= self.len);
397        unsafe {
398            // safety: we just checked that `n <= self.len`
399            self.storage_skip_unchecked(u32_n);
400        }
401        self.len -= u32_n;
402    }
403
404    /// Advances the buffer by `n` bytes "consuming" filled data.
405    /// Does not panic, hence the unsafe
406    ///
407    /// # Safety
408    /// `n` must be <= self.len
409    #[inline(always)]
410    unsafe fn storage_skip_unchecked(&mut self, n: u32) {
411        match &mut self.storage {
412            StorageMut::Buf(b) => b.skip(n as _),
413            StorageMut::Box(b) => b.off += n,
414        }
415    }
416
417    /// Takes the first `n` bytes (up to `len`) as a `Roll`, and advances
418    /// this buffer. Returns `None` if `len` is zero. Panics if `n` is
419    /// zero.
420    pub fn take_at_most(&mut self, n: usize) -> Option<Roll> {
421        assert!(n != 0, "refusing to do empty take_at_most");
422
423        if self.len == 0 {
424            return None;
425        }
426
427        let n = std::cmp::min(n, self.len as usize);
428        let roll = self.filled().slice(..n);
429        self.skip(n);
430        Some(roll)
431    }
432
433    /// Takes the whole `filled` part. Panics if empty, since this is probably
434    /// a misuse.
435    pub fn take_all(&mut self) -> Roll {
436        let roll = self.filled();
437        assert!(
438            !roll.is_empty(),
439            "take_all is pointless if the filled part is empty, check len first"
440        );
441        self.skip(roll.len());
442        roll
443    }
444
445    /// Advance this buffer, keeping the filled part only starting with
446    /// the given roll. (Useful after parsing something with nom)
447    ///
448    /// Panics if the roll is not from this buffer
449    pub fn keep(&mut self, roll: Roll) {
450        match (&mut self.storage, &roll.inner) {
451            (StorageMut::Buf(ours), RollInner::Buf(theirs)) => {
452                assert_eq!(ours.index, theirs.index, "roll must be from same buffer");
453                assert!(theirs.off >= ours.off, "roll must start within buffer");
454                let skipped = theirs.off - ours.off;
455                trace!(our_index = %ours.index, their_index = %theirs.index, our_off = %ours.off, their_off = %theirs.off, %skipped, "RollMut::keep");
456                self.len -= skipped as u32;
457                ours.len -= skipped;
458                ours.off = theirs.off;
459            }
460            (StorageMut::Box(ours), RollInner::Box(theirs)) => {
461                assert_eq!(
462                    ours.buf.get(),
463                    theirs.b.buf.get(),
464                    "roll must be from same buffer"
465                );
466                assert!(theirs.b.off >= ours.off, "roll must start within buffer");
467                let skipped = theirs.b.off - ours.off;
468                self.len -= skipped;
469                ours.off = theirs.b.off;
470            }
471            _ => {
472                panic!("roll must be from same buffer");
473            }
474        }
475    }
476}
477
478impl std::io::Write for RollMut {
479    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
480        let n = buf.len();
481        if self.cap() < n {
482            self.reserve_at_least(n)
483                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
484        }
485        self.put(buf)
486            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
487        Ok(n)
488    }
489
490    fn flush(&mut self) -> std::io::Result<()> {
491        // no need to flush
492        Ok(())
493    }
494}
495
496impl Deref for RollMut {
497    type Target = [u8];
498
499    #[inline(always)]
500    fn deref(&self) -> &Self::Target {
501        match &self.storage {
502            StorageMut::Buf(b) => &b[..self.len as usize],
503            StorageMut::Box(b) => b.slice(self.len),
504        }
505    }
506}
507
508pub(crate) struct ReadInto {
509    buf: RollMut,
510    off: u32,
511    cap: u32,
512}
513
514unsafe impl IoBufMut for ReadInto {
515    fn io_buf_mut_stable_mut_ptr(&mut self) -> *mut u8 {
516        unsafe { self.buf.storage.as_mut_ptr().add(self.off as usize) }
517    }
518
519    fn io_buf_mut_capacity(&self) -> usize {
520        self.cap as _
521    }
522}
523
524/// An immutable view into a [RollMut]
525#[derive(Clone)]
526pub struct Roll {
527    inner: RollInner,
528}
529
530impl Debug for Roll {
531    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
532        Debug::fmt(&self[..], f)
533    }
534}
535
536impl<T> PartialEq<T> for Roll
537where
538    T: AsRef<[u8]>,
539{
540    fn eq(&self, other: &T) -> bool {
541        &self[..] == other.as_ref()
542    }
543}
544
545impl Eq for Roll {}
546
547impl Hash for Roll {
548    fn hash<H: Hasher>(&self, state: &mut H) {
549        self[..].hash(state)
550    }
551}
552
553impl From<RollInner> for Roll {
554    fn from(inner: RollInner) -> Self {
555        Self { inner }
556    }
557}
558
559impl From<RollBox> for Roll {
560    fn from(b: RollBox) -> Self {
561        RollInner::Box(b).into()
562    }
563}
564
565impl From<Buf> for Roll {
566    fn from(b: Buf) -> Self {
567        RollInner::Buf(b).into()
568    }
569}
570
571#[derive(Clone)]
572enum RollInner {
573    Buf(Buf),
574    Box(RollBox),
575    Empty,
576}
577
578#[derive(Clone)]
579struct RollBox {
580    b: BoxStorage,
581    len: u32,
582}
583
584impl RollBox {
585    #[inline(always)]
586    fn split_at(self, at: usize) -> (Self, Self) {
587        let at: u32 = at.try_into().unwrap();
588        assert!(at <= self.len);
589
590        let left = Self {
591            b: self.b.clone(),
592            len: at,
593        };
594
595        let mut right = Self {
596            b: self.b,
597            len: self.len - at,
598        };
599        right.b.off += at;
600
601        (left, right)
602    }
603
604    #[inline(always)]
605    fn len(&self) -> usize {
606        self.len as usize
607    }
608
609    fn slice(mut self, range: impl RangeBounds<usize>) -> Self {
610        let mut new_start = 0;
611        let mut new_end = self.len();
612
613        match range.start_bound() {
614            Bound::Included(&n) => new_start = n,
615            Bound::Excluded(&n) => new_start = n + 1,
616            Bound::Unbounded => {}
617        }
618
619        match range.end_bound() {
620            Bound::Included(&n) => new_end = n + 1,
621            Bound::Excluded(&n) => new_end = n,
622            Bound::Unbounded => {}
623        }
624
625        assert!(new_start <= new_end);
626        assert!(new_end <= self.len());
627
628        self.b.off += new_start as u32;
629        self.len = (new_end - new_start) as u32;
630        self
631    }
632}
633
634impl AsRef<[u8]> for RollBox {
635    #[inline(always)]
636    fn as_ref(&self) -> &[u8] {
637        self.b.slice(self.len)
638    }
639}
640
641impl AsRef<[u8]> for Roll {
642    #[inline(always)]
643    fn as_ref(&self) -> &[u8] {
644        match &self.inner {
645            RollInner::Buf(b) => b.as_ref(),
646            RollInner::Box(b) => b.as_ref(),
647            RollInner::Empty => &[],
648        }
649    }
650}
651
652impl Deref for Roll {
653    type Target = [u8];
654
655    #[inline(always)]
656    fn deref(&self) -> &Self::Target {
657        self.as_ref()
658    }
659}
660
661impl RollInner {
662    #[inline(always)]
663    fn split_at(self, at: usize) -> (Self, Self) {
664        match self {
665            RollInner::Buf(b) => {
666                let (left, right) = b.split_at(at);
667                (RollInner::Buf(left), RollInner::Buf(right))
668            }
669            RollInner::Box(b) => {
670                let (left, right) = b.split_at(at);
671                (RollInner::Box(left), RollInner::Box(right))
672            }
673            RollInner::Empty => {
674                assert_eq!(at, 0);
675                (RollInner::Empty, RollInner::Empty)
676            }
677        }
678    }
679}
680
681impl Roll {
682    /// Creates an empty roll
683    pub fn empty() -> Self {
684        RollInner::Empty.into()
685    }
686
687    /// Returns the length of this roll
688    #[inline(always)]
689    pub fn len(&self) -> usize {
690        match &self.inner {
691            RollInner::Buf(b) => b.len(),
692            RollInner::Box(b) => b.len(),
693            RollInner::Empty => 0,
694        }
695    }
696
697    /// Returns true if this roll is empty
698    pub fn is_empty(&self) -> bool {
699        self.len() == 0
700    }
701
702    pub fn split_at(self, at: usize) -> (Roll, Roll) {
703        let (left, right) = self.inner.split_at(at);
704        (left.into(), right.into())
705    }
706
707    pub fn slice(self, range: impl RangeBounds<usize>) -> Self {
708        match self.inner {
709            RollInner::Buf(b) => b.slice(range).into(),
710            RollInner::Box(b) => b.slice(range).into(),
711            RollInner::Empty => panic!("cannot slice empty roll"),
712        }
713    }
714
715    pub fn iter(&self) -> RollIter {
716        RollIter {
717            roll: self.clone(),
718            pos: 0,
719        }
720    }
721
722    pub fn to_string_lossy(&self) -> Cow<'_, str> {
723        String::from_utf8_lossy(self)
724    }
725
726    /// Decode as utf-8
727    pub fn to_string(self) -> Result<RollStr, Utf8Error> {
728        _ = std::str::from_utf8(&self)?;
729        Ok(RollStr { roll: self })
730    }
731
732    /// Convert to [RollStr].
733    ///
734    /// # Safety
735    /// UB if not utf-8. Typically only used in parsers.
736    pub unsafe fn to_string_unchecked(self) -> RollStr {
737        RollStr { roll: self }
738    }
739}
740
741impl InputIter for Roll {
742    type Item = u8;
743    type Iter = Enumerate<Self::IterElem>;
744    type IterElem = RollIter;
745
746    #[inline]
747    fn iter_indices(&self) -> Self::Iter {
748        self.iter_elements().enumerate()
749    }
750    #[inline]
751    fn iter_elements(&self) -> Self::IterElem {
752        self.iter()
753    }
754    #[inline]
755    fn position<P>(&self, predicate: P) -> Option<usize>
756    where
757        P: Fn(Self::Item) -> bool,
758    {
759        self.iter().position(predicate)
760    }
761    #[inline]
762    fn slice_index(&self, count: usize) -> Result<usize, Needed> {
763        if self.len() >= count {
764            Ok(count)
765        } else {
766            Err(Needed::new(count - self.len()))
767        }
768    }
769}
770
771/// An iterator over [Roll]
772pub struct RollIter {
773    roll: Roll,
774    pos: usize,
775}
776
777impl Iterator for RollIter {
778    type Item = u8;
779
780    fn next(&mut self) -> Option<Self::Item> {
781        if self.pos >= self.roll.len() {
782            return None;
783        }
784
785        let c = self.roll[self.pos];
786        self.pos += 1;
787        Some(c)
788    }
789
790    fn size_hint(&self) -> (usize, Option<usize>) {
791        let remaining = self.roll.len() - self.pos;
792        (remaining, Some(remaining))
793    }
794}
795
796impl InputTake for Roll {
797    #[inline]
798    fn take(&self, count: usize) -> Self {
799        self.clone().slice(..count)
800    }
801    #[inline]
802    fn take_split(&self, count: usize) -> (Self, Self) {
803        let (prefix, suffix) = self.clone().split_at(count);
804        (suffix, prefix)
805    }
806}
807
808impl InputTakeAtPosition for Roll {
809    type Item = u8;
810
811    fn split_at_position<P, E: nom::error::ParseError<Self>>(
812        &self,
813        predicate: P,
814    ) -> nom::IResult<Self, Self, E>
815    where
816        P: Fn(Self::Item) -> bool,
817    {
818        match self.iter().position(predicate) {
819            Some(i) => Ok(self.clone().take_split(i)),
820            None => Err(nom::Err::Incomplete(nom::Needed::new(1))),
821        }
822    }
823
824    fn split_at_position1<P, E: nom::error::ParseError<Self>>(
825        &self,
826        predicate: P,
827        e: nom::error::ErrorKind,
828    ) -> nom::IResult<Self, Self, E>
829    where
830        P: Fn(Self::Item) -> bool,
831    {
832        match self.iter().position(predicate) {
833            Some(0) => Err(nom::Err::Error(E::from_error_kind(self.clone(), e))),
834            Some(i) => Ok(self.take_split(i)),
835            None => Err(nom::Err::Incomplete(nom::Needed::new(1))),
836        }
837    }
838
839    fn split_at_position_complete<P, E: nom::error::ParseError<Self>>(
840        &self,
841        predicate: P,
842    ) -> nom::IResult<Self, Self, E>
843    where
844        P: Fn(Self::Item) -> bool,
845    {
846        match self.iter().position(predicate) {
847            Some(i) => Ok(self.take_split(i)),
848            None => Ok(self.take_split(self.input_len())),
849        }
850    }
851
852    fn split_at_position1_complete<P, E: nom::error::ParseError<Self>>(
853        &self,
854        predicate: P,
855        e: nom::error::ErrorKind,
856    ) -> nom::IResult<Self, Self, E>
857    where
858        P: Fn(Self::Item) -> bool,
859    {
860        match self.iter().position(predicate) {
861            Some(0) => Err(nom::Err::Error(E::from_error_kind(self.clone(), e))),
862            Some(i) => Ok(self.take_split(i)),
863            None => {
864                if self.is_empty() {
865                    Err(nom::Err::Error(E::from_error_kind(self.clone(), e)))
866                } else {
867                    Ok(self.take_split(self.input_len()))
868                }
869            }
870        }
871    }
872}
873
874impl FindSubstring<&[u8]> for Roll {
875    fn find_substring(&self, substr: &[u8]) -> Option<usize> {
876        if substr.len() > self.len() {
877            return None;
878        }
879
880        let (&substr_first, substr_rest) = match substr.split_first() {
881            Some(split) => split,
882            // an empty substring is found at position 0
883            // This matches the behavior of str.find("").
884            None => return Some(0),
885        };
886
887        if substr_rest.is_empty() {
888            return memchr::memchr(substr_first, self);
889        }
890
891        let mut offset = 0;
892        let haystack = &self[..self.len() - substr_rest.len()];
893
894        while let Some(position) = memchr::memchr(substr_first, &haystack[offset..]) {
895            offset += position;
896            let next_offset = offset + 1;
897            if &self[next_offset..][..substr_rest.len()] == substr_rest {
898                return Some(offset);
899            }
900
901            offset = next_offset;
902        }
903
904        None
905    }
906}
907
908impl Compare<&[u8]> for Roll {
909    #[inline(always)]
910    fn compare(&self, t: &[u8]) -> CompareResult {
911        let pos = self.iter().zip(t.iter()).position(|(a, b)| a != *b);
912
913        match pos {
914            Some(_) => CompareResult::Error,
915            None => {
916                if self.len() >= t.len() {
917                    CompareResult::Ok
918                } else {
919                    CompareResult::Incomplete
920                }
921            }
922        }
923    }
924
925    #[inline(always)]
926    fn compare_no_case(&self, t: &[u8]) -> CompareResult {
927        if self
928            .iter()
929            .zip(t)
930            .any(|(a, b)| lowercase_byte(a) != lowercase_byte(*b))
931        {
932            CompareResult::Error
933        } else if self.len() < t.len() {
934            CompareResult::Incomplete
935        } else {
936            CompareResult::Ok
937        }
938    }
939}
940
941fn lowercase_byte(c: u8) -> u8 {
942    match c {
943        b'A'..=b'Z' => c - b'A' + b'a',
944        _ => c,
945    }
946}
947
948impl InputLength for Roll {
949    #[inline]
950    fn input_len(&self) -> usize {
951        self.len()
952    }
953}
954
955impl<S> Slice<S> for Roll
956where
957    S: RangeBounds<usize>,
958{
959    fn slice(&self, range: S) -> Self {
960        Roll::slice(self.clone(), range)
961    }
962}
963
964/// A [Roll] that's also a valid utf-8 string.
965#[derive(Clone)]
966pub struct RollStr {
967    roll: Roll,
968}
969
970impl Debug for RollStr {
971    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
972        Debug::fmt(&self[..], f)
973    }
974}
975
976impl Deref for RollStr {
977    type Target = str;
978
979    fn deref(&self) -> &Self::Target {
980        unsafe { std::str::from_utf8_unchecked(&self.roll) }
981    }
982}
983
984impl RollStr {
985    pub fn into_inner(self) -> Roll {
986        self.roll
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use crate::trace;
993    use nom::IResult;
994
995    use crate::{Roll, RollMut, BUF_SIZE};
996
997    #[test]
998    fn test_roll_put() {
999        crate::bufpool::initialize_allocator().unwrap();
1000
1001        fn test_roll_put_inner(mut rm: RollMut) {
1002            let initial_size = rm.cap();
1003
1004            rm.put(b"hello").unwrap();
1005            assert_eq!(rm.cap(), initial_size - 5);
1006
1007            let filled = rm.filled();
1008            assert_eq!(&filled[..], b"hello");
1009
1010            rm.skip(5);
1011            assert_eq!(rm.len(), 0);
1012            assert_eq!(rm.cap(), initial_size - 5);
1013        }
1014
1015        let rm = RollMut::alloc().unwrap();
1016        assert_eq!(rm.cap(), BUF_SIZE as usize);
1017        test_roll_put_inner(rm);
1018
1019        let mut rm = RollMut::alloc().unwrap();
1020        rm.grow();
1021        test_roll_put_inner(rm);
1022
1023        let mut rm = RollMut::alloc().unwrap();
1024        rm.grow();
1025        rm.grow();
1026        test_roll_put_inner(rm);
1027    }
1028
1029    #[test]
1030    fn test_roll_put_does_not_fit() {
1031        crate::bufpool::initialize_allocator().unwrap();
1032
1033        let mut rm = RollMut::alloc().unwrap();
1034        rm.put(" ".repeat(rm.cap())).unwrap();
1035
1036        let err = rm.put("drop").unwrap_err();
1037        assert!(format!("{err:?}").contains("DoesNotFit"));
1038        assert!(format!("{err}").contains("does not fit"));
1039    }
1040
1041    #[test]
1042    fn test_roll_realloc() {
1043        crate::bufpool::initialize_allocator().unwrap();
1044
1045        fn test_roll_realloc_inner(mut rm: RollMut) {
1046            let init_cap = rm.cap();
1047            rm.put("hello").unwrap();
1048            rm.take_all();
1049            assert_eq!(rm.cap(), init_cap - 5);
1050
1051            rm.compact().unwrap();
1052            assert_eq!(rm.cap(), BUF_SIZE as usize);
1053        }
1054
1055        let rm = RollMut::alloc().unwrap();
1056        test_roll_realloc_inner(rm);
1057
1058        let mut rm = RollMut::alloc().unwrap();
1059        rm.grow();
1060        test_roll_realloc_inner(rm);
1061    }
1062
1063    #[test]
1064    fn test_roll_realloc_big() {
1065        crate::bufpool::initialize_allocator().unwrap();
1066
1067        let mut rm = RollMut::alloc().unwrap();
1068        rm.grow();
1069
1070        let put = "x".repeat(rm.cap() * 2 / 3);
1071        rm.put(&put).unwrap();
1072        rm.compact().unwrap();
1073
1074        assert_eq!(rm.storage_size(), BUF_SIZE as usize * 2);
1075        assert_eq!(rm.len(), put.len());
1076        assert_eq!(&rm[..], put.as_bytes());
1077    }
1078
1079    #[test]
1080    fn test_roll_reserve() {
1081        crate::bufpool::initialize_allocator().unwrap();
1082
1083        let mut rm = RollMut::alloc().unwrap();
1084        assert_eq!(rm.cap(), BUF_SIZE as usize);
1085        assert_eq!(rm.len(), 0);
1086        rm.reserve().unwrap();
1087        assert_eq!(rm.cap(), BUF_SIZE as usize);
1088        assert_eq!(rm.len(), 0);
1089
1090        rm.put("hello").unwrap();
1091        rm.take_all();
1092
1093        assert_eq!(rm.cap(), BUF_SIZE as usize - 5);
1094        assert_eq!(rm.len(), 0);
1095        rm.reserve().unwrap();
1096        assert_eq!(rm.cap(), BUF_SIZE as usize - 5);
1097        assert_eq!(rm.len(), 0);
1098
1099        let old_cap = rm.cap();
1100        rm.put(b" ".repeat(old_cap)).unwrap();
1101        assert_eq!(rm.cap(), 0);
1102        assert_eq!(rm.len(), old_cap);
1103
1104        rm.reserve().unwrap();
1105        assert_eq!(rm.cap(), 5);
1106        assert_eq!(rm.len(), old_cap);
1107
1108        rm.put("hello").unwrap();
1109        rm.reserve().unwrap();
1110        assert_eq!(rm.cap(), BUF_SIZE as usize);
1111        assert_eq!(rm.len(), BUF_SIZE as usize);
1112    }
1113
1114    #[test]
1115    fn test_roll_put_then_grow() {
1116        crate::bufpool::initialize_allocator().unwrap();
1117
1118        let mut rm = RollMut::alloc().unwrap();
1119        assert_eq!(rm.cap(), BUF_SIZE as usize);
1120
1121        let input = b"I am pretty long";
1122
1123        rm.put(input).unwrap();
1124        assert_eq!(rm.len(), input.len());
1125        assert_eq!(&rm[..], input);
1126
1127        assert_eq!(rm.cap(), BUF_SIZE as usize - input.len());
1128
1129        rm.grow();
1130        assert_eq!(rm.cap(), 2 * (BUF_SIZE as usize) - input.len());
1131        assert_eq!(&rm[..], input);
1132
1133        rm.skip(5);
1134        assert_eq!(&rm[..], b"pretty long");
1135    }
1136
1137    #[test]
1138    #[cfg(not(feature = "miri"))]
1139    fn test_roll_readfrom_start() {
1140        crate::bufpool::initialize_allocator().unwrap();
1141
1142        use crate::WriteOwned;
1143
1144        crate::start(async move {
1145            let mut rm = RollMut::alloc().unwrap();
1146
1147            let (mut send, mut read) = crate::pipe();
1148            crate::spawn(async move {
1149                send.write_all_owned("123456").await.unwrap();
1150            });
1151
1152            let mut res;
1153            (res, rm) = rm.read_into(3, &mut read).await;
1154            res.unwrap();
1155
1156            assert_eq!(rm.len(), 3);
1157            assert_eq!(rm.filled().as_ref(), b"123");
1158
1159            (res, rm) = rm.read_into(3, &mut read).await;
1160            res.unwrap();
1161
1162            assert_eq!(rm.len(), 6);
1163            assert_eq!(rm.filled().as_ref(), b"123456");
1164        });
1165    }
1166
1167    #[test]
1168    fn test_roll_keep() {
1169        crate::bufpool::initialize_allocator().unwrap();
1170
1171        fn test_roll_keep_inner(mut rm: RollMut) {
1172            rm.put(b"helloworld").unwrap();
1173            assert_eq!(&rm[..], b"helloworld");
1174
1175            {
1176                let roll = rm.filled().slice(3..=5);
1177                assert_eq!(roll, b"low");
1178            }
1179
1180            {
1181                let roll = rm.filled().slice(..5);
1182                assert_eq!(roll, b"hello");
1183            }
1184
1185            let roll = rm.filled().slice(5..);
1186            assert_eq!(roll, b"world");
1187
1188            rm.keep(roll);
1189            assert_eq!(&rm[..], b"world");
1190        }
1191
1192        let rm = RollMut::alloc().unwrap();
1193        test_roll_keep_inner(rm);
1194
1195        let mut rm = RollMut::alloc().unwrap();
1196        rm.grow();
1197        test_roll_keep_inner(rm);
1198    }
1199
1200    #[test]
1201    #[should_panic(expected = "roll must be from same buffer")]
1202    fn test_roll_keep_different_buf() {
1203        crate::bufpool::initialize_allocator().unwrap();
1204
1205        let mut rm1 = RollMut::alloc().unwrap();
1206        rm1.put("hello").unwrap();
1207
1208        let mut rm2 = RollMut::alloc().unwrap();
1209        rm2.put("hello").unwrap();
1210        let roll2 = rm2.take_all();
1211
1212        rm1.keep(roll2);
1213    }
1214
1215    #[test]
1216    #[should_panic(expected = "roll must be from same buffer")]
1217    fn test_roll_keep_different_box() {
1218        crate::bufpool::initialize_allocator().unwrap();
1219
1220        let mut rm1 = RollMut::alloc().unwrap();
1221        rm1.grow();
1222        rm1.put("hello").unwrap();
1223
1224        let mut rm2 = RollMut::alloc().unwrap();
1225        rm2.grow();
1226        rm2.put("hello").unwrap();
1227        let roll2 = rm2.take_all();
1228
1229        rm1.keep(roll2);
1230    }
1231
1232    #[test]
1233    #[should_panic(expected = "roll must be from same buffer")]
1234    fn test_roll_keep_different_type() {
1235        crate::bufpool::initialize_allocator().unwrap();
1236
1237        let mut rm1 = RollMut::alloc().unwrap();
1238        rm1.grow();
1239        rm1.put("hello").unwrap();
1240
1241        let mut rm2 = RollMut::alloc().unwrap();
1242        rm2.put("hello").unwrap();
1243        let roll2 = rm2.take_all();
1244
1245        rm1.keep(roll2);
1246    }
1247
1248    #[test]
1249    #[should_panic(expected = "roll must start within buffer")]
1250    fn test_roll_keep_before_buf() {
1251        crate::bufpool::initialize_allocator().unwrap();
1252
1253        let mut rm1 = RollMut::alloc().unwrap();
1254        rm1.put("hello").unwrap();
1255        let roll = rm1.filled();
1256        rm1.skip(5);
1257        rm1.keep(roll);
1258    }
1259
1260    #[test]
1261    #[should_panic(expected = "roll must start within buffer")]
1262    fn test_roll_keep_before_box() {
1263        crate::bufpool::initialize_allocator().unwrap();
1264
1265        let mut rm1 = RollMut::alloc().unwrap();
1266        rm1.grow();
1267        rm1.put("hello").unwrap();
1268        let roll = rm1.filled();
1269        rm1.skip(5);
1270        rm1.keep(roll);
1271    }
1272
1273    #[test]
1274    fn test_roll_iter() {
1275        crate::bufpool::initialize_allocator().unwrap();
1276
1277        let mut rm = RollMut::alloc().unwrap();
1278        rm.put(b"hello").unwrap();
1279        let roll = rm.filled();
1280        let v = roll.iter().collect::<Vec<_>>();
1281        assert_eq!(v, b"hello");
1282
1283        assert_eq!(roll.to_string_lossy(), "hello");
1284    }
1285
1286    #[test]
1287    #[cfg(not(feature = "miri"))]
1288    fn test_roll_iobuf() {
1289        crate::bufpool::initialize_allocator().unwrap();
1290
1291        use b_x::{BxForResults, BX};
1292
1293        use crate::{
1294            io::IntoHalves,
1295            net::{TcpListener, TcpStream},
1296            ReadOwned, WriteOwned,
1297        };
1298
1299        async fn test_roll_iobuf_inner(mut rm: RollMut) -> b_x::Result<()> {
1300            rm.put(b"hello").unwrap();
1301            let roll = rm.take_all();
1302
1303            let ln = TcpListener::bind("127.0.0.1:0".parse().unwrap())
1304                .await
1305                .unwrap();
1306            let local_addr = ln.local_addr().unwrap();
1307
1308            let send_fut = async move {
1309                let stream = TcpStream::connect(local_addr).await.bx()?;
1310                let (_stream_r, mut stream_w) = IntoHalves::into_halves(stream);
1311                stream_w.write_all_owned(roll).await?;
1312                Ok::<_, BX>(())
1313            };
1314
1315            let recv_fut = async move {
1316                let (stream, addr) = ln.accept().await.bx()?;
1317                let (mut stream_r, _stream_w) = IntoHalves::into_halves(stream);
1318                println!("Accepted connection from {addr}");
1319
1320                let mut buf = vec![0u8; 1024];
1321                let res;
1322                (res, buf) = stream_r.read_owned(buf).await;
1323                let n = res?;
1324
1325                assert_eq!(&buf[..n], b"hello");
1326
1327                Ok::<_, BX>(())
1328            };
1329
1330            tokio::try_join!(send_fut, recv_fut)?;
1331            Ok(())
1332        }
1333
1334        crate::start(async move {
1335            let rm = RollMut::alloc().unwrap();
1336            test_roll_iobuf_inner(rm).await.unwrap();
1337
1338            let mut rm = RollMut::alloc().unwrap();
1339            rm.grow();
1340            test_roll_iobuf_inner(rm).await.unwrap();
1341        });
1342    }
1343
1344    #[test]
1345    fn test_roll_take_at_most() {
1346        crate::bufpool::initialize_allocator().unwrap();
1347
1348        let mut rm = RollMut::alloc().unwrap();
1349        rm.put(b"hello").unwrap();
1350        let roll = rm.take_at_most(4).unwrap();
1351        assert_eq!(roll, b"hell");
1352
1353        let mut rm = RollMut::alloc().unwrap();
1354        rm.put(b"hello").unwrap();
1355        let roll = rm.take_at_most(12).unwrap();
1356        assert_eq!(roll, b"hello");
1357
1358        let mut rm = RollMut::alloc().unwrap();
1359        assert!(rm.take_at_most(12).is_none());
1360    }
1361
1362    #[test]
1363    fn test_roll_take_all() {
1364        crate::bufpool::initialize_allocator().unwrap();
1365
1366        let mut rm = RollMut::alloc().unwrap();
1367        rm.put(b"hello").unwrap();
1368        let roll = rm.take_all();
1369        assert_eq!(roll, b"hello");
1370    }
1371
1372    #[test]
1373    #[should_panic(expected = "take_all is pointless if the filled part is empty")]
1374    fn test_roll_take_all_empty() {
1375        crate::bufpool::initialize_allocator().unwrap();
1376
1377        let mut rm = RollMut::alloc().unwrap();
1378        rm.take_all();
1379    }
1380
1381    #[test]
1382    #[should_panic(expected = "refusing to do empty take_at_most")]
1383    fn test_roll_take_at_most_panic() {
1384        crate::bufpool::initialize_allocator().unwrap();
1385
1386        let mut rm = RollMut::alloc().unwrap();
1387        rm.take_at_most(0);
1388    }
1389
1390    #[test]
1391    fn test_roll_nom_sample() {
1392        crate::bufpool::initialize_allocator().unwrap();
1393
1394        fn parse(i: Roll) -> IResult<Roll, Roll> {
1395            nom::bytes::streaming::tag(&b"HTTP/1.1 200 OK"[..])(i)
1396        }
1397
1398        let mut buf = RollMut::alloc().unwrap();
1399
1400        let input = b"HTTP/1.1 200 OK".repeat(1000);
1401        let mut pending = &input[..];
1402
1403        loop {
1404            if buf.cap() == 0 {
1405                trace!("buf had zero cap, growing");
1406                buf.grow()
1407            }
1408
1409            let (rest, version) = match parse(buf.filled()) {
1410                Ok(t) => t,
1411                Err(e) => {
1412                    if e.is_incomplete() {
1413                        {
1414                            if pending.is_empty() {
1415                                println!("ran out of input");
1416                                break;
1417                            }
1418
1419                            let n = std::cmp::min(buf.cap(), pending.len());
1420                            buf.put(&pending[..n]).unwrap();
1421                            pending = &pending[n..];
1422
1423                            println!("advanced by {n}, {} remaining", pending.len());
1424                        }
1425
1426                        continue;
1427                    }
1428                    panic!("parsing error: {e}");
1429                }
1430            };
1431            assert_eq!(version, b"HTTP/1.1 200 OK");
1432
1433            buf.keep(rest);
1434        }
1435    }
1436
1437    #[test]
1438    fn test_roll_io_write() {
1439        crate::bufpool::initialize_allocator().unwrap();
1440
1441        let mut rm = RollMut::alloc().unwrap();
1442        std::io::Write::write_all(&mut rm, b"hello").unwrap();
1443
1444        let roll = rm.take_all();
1445        assert_eq!(std::str::from_utf8(&roll).unwrap(), "hello");
1446    }
1447
1448    #[test]
1449    fn test_reallocate_big() {
1450        crate::bufpool::initialize_allocator().unwrap();
1451
1452        let mut rm = RollMut::alloc().unwrap();
1453        rm.put(b"baba yaga").unwrap();
1454        let filled = rm.filled();
1455        let (_frame, rest) = filled.split_at(4);
1456        rm.keep(rest);
1457
1458        rm.reserve_at_least(5263945).unwrap();
1459        assert!(rm.cap() >= 5263945);
1460    }
1461}