fluke_buffet/
roll.rs

1use std::{
2    borrow::Cow,
3    cell::UnsafeCell,
4    fmt::{Debug, Formatter},
5    iter::Enumerate,
6    ops::{Bound, Deref, RangeBounds},
7    rc::Rc,
8    str::Utf8Error,
9};
10
11use crate::{io::ReadOwned, Error, IoBufMut};
12use nom::{
13    Compare, CompareResult, FindSubstring, InputIter, InputLength, InputTake, InputTakeAtPosition,
14    Needed, Slice,
15};
16use tracing::trace;
17
18use crate::{Buf, BufMut, BUF_SIZE};
19
20type Result<T, E = crate::Error> = std::result::Result<T, E>;
21
22/// A "rolling buffer". Uses either one [BufMut] or a `Box<[u8]>` for storage.
23/// This buffer never grows, but it can be split, and it can be reallocated so
24/// it regains its initical capacity, minus the length of the filled part.
25pub struct RollMut {
26    storage: StorageMut,
27    len: u32,
28}
29
30enum StorageMut {
31    Buf(BufMut),
32    Box(BoxStorage),
33}
34
35impl Debug for StorageMut {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        match self {
38            Self::Buf(bm) => f
39                .debug_struct("Buf")
40                .field("index", &bm.index)
41                .field("off", &bm.off)
42                .field("len", &bm.len)
43                .finish(),
44            Self::Box(bs) => f
45                .debug_struct("Box")
46                .field("buf", &bs.buf)
47                .field("off", &bs.off)
48                .finish(),
49        }
50    }
51}
52
53impl StorageMut {
54    #[inline(always)]
55    fn cap(&self) -> usize {
56        match self {
57            StorageMut::Buf(_) => BUF_SIZE as usize,
58            StorageMut::Box(b) => b.cap(),
59        }
60    }
61
62    #[inline(always)]
63    fn len(&self) -> usize {
64        match self {
65            StorageMut::Buf(b) => b.len(),
66            StorageMut::Box(b) => b.len(),
67        }
68    }
69
70    unsafe fn as_mut_ptr(&mut self) -> *mut u8 {
71        match self {
72            StorageMut::Buf(b) => b.as_mut_ptr(),
73            StorageMut::Box(b) => b.as_mut_ptr(),
74        }
75    }
76}
77
78#[derive(Clone)]
79struct BoxStorage {
80    buf: Rc<UnsafeCell<Box<[u8]>>>,
81    off: u32,
82}
83
84impl BoxStorage {
85    #[inline(always)]
86    fn len(&self) -> usize {
87        let buf = self.buf.get();
88        let len = unsafe { (*buf).len() };
89        len - self.off as usize
90    }
91
92    unsafe fn as_mut_ptr(&self) -> *mut u8 {
93        let buf = self.buf.get();
94        (*buf).as_mut_ptr().add(self.off as usize)
95    }
96
97    /// Returns a slice of bytes into this buffer, of the specified length
98    /// Panics if the length is larger than the buffer.
99    fn slice(&self, len: u32) -> &[u8] {
100        let buf = self.buf.get();
101        unsafe { &(*buf)[self.off as usize..][..len as usize] }
102    }
103
104    /// Returns a mutable slice of bytes into this buffer, of the specified
105    /// length Panics if the length is larger than the buffer.
106    fn slice_mut(&mut self, len: u32) -> &mut [u8] {
107        let buf = self.buf.get();
108        unsafe { &mut (*buf)[self.off as usize..][..len as usize] }
109    }
110
111    fn cap(&self) -> usize {
112        let buf = self.buf.get();
113        unsafe { (*buf).len() }
114    }
115}
116
117impl RollMut {
118    /// Allocate, using a single [BufMut] for storage.
119    pub fn alloc() -> Result<Self> {
120        Ok(Self {
121            storage: StorageMut::Buf(BufMut::alloc()?),
122            len: 0,
123        })
124    }
125
126    /// Double the capacity of this buffer by reallocating it, copying the
127    /// filled part into the new buffer. This method always uses a `Box<[u8]>`
128    /// for storage.
129    ///
130    /// This method is somewhat expensive.
131    pub fn grow(&mut self) {
132        let old_cap = self.storage.cap();
133        let new_cap = old_cap * 2;
134        // TODO: optimize via `MaybeUninit`?
135        let b = vec![0; new_cap].into_boxed_slice();
136        let mut bs = BoxStorage {
137            buf: Rc::new(UnsafeCell::new(b)),
138            off: 0,
139        };
140        let dst_slice = bs.slice_mut(self.len() as u32);
141        dst_slice.copy_from_slice(&self[..]);
142        let next_storage = StorageMut::Box(bs);
143
144        self.storage = next_storage;
145    }
146
147    /// Reallocates the backing storage for this buffer, copying the filled
148    /// portion into it. Panics if `len() == storage_size()`, in which case
149    /// reallocating won't do much good
150    pub fn realloc(&mut self) -> Result<()> {
151        assert!(self.len() != self.storage_size());
152
153        let next_storage = match &self.storage {
154            StorageMut::Buf(_) => {
155                let mut next_b = BufMut::alloc()?;
156                next_b[..self.len()].copy_from_slice(&self[..]);
157                StorageMut::Buf(next_b)
158            }
159            StorageMut::Box(b) => {
160                if self.len() > BUF_SIZE as usize {
161                    // TODO: optimize via `MaybeUninit`?
162                    let mut next_b = vec![0; b.cap()].into_boxed_slice();
163                    next_b[..self.len()].copy_from_slice(&self[..]);
164                    let next_b = BoxStorage {
165                        buf: Rc::new(UnsafeCell::new(next_b)),
166                        off: 0,
167                    };
168                    StorageMut::Box(next_b)
169                } else {
170                    let mut next_b = BufMut::alloc()?;
171                    next_b[..self.len()].copy_from_slice(&self[..]);
172                    StorageMut::Buf(next_b)
173                }
174            }
175        };
176
177        self.storage = next_storage;
178
179        Ok(())
180    }
181
182    /// Reserve more capacity for this buffer if this buffer is full.
183    /// If this buffer's size matches the underlying storage size,
184    /// this is equivalent to `grow`. Otherwise, it's equivalent
185    /// to `realloc`.
186    pub fn reserve(&mut self) -> Result<()> {
187        if self.len() < self.cap() {
188            return Ok(());
189        }
190
191        if self.len() < self.storage_size() {
192            // we don't need to go up a buffer size
193            trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: reallocating");
194            self.realloc()?
195        } else {
196            trace!(len = %self.len(), cap = %self.cap(), storage_size = %self.storage_size(), "in reserve: growing");
197            self.grow()
198        }
199
200        Ok(())
201    }
202
203    /// Make sure we can hold "request_len"
204    pub fn reserve_at_least(&mut self, requested_len: usize) -> Result<()> {
205        while self.cap() < requested_len {
206            if self.cap() < self.storage_size() {
207                // we don't need to go up a buffer size
208                self.realloc()?
209            } else {
210                self.grow()
211            }
212        }
213
214        Ok(())
215    }
216
217    /// The length (filled portion) of this buffer, that can be read
218    #[inline(always)]
219    pub fn len(&self) -> usize {
220        self.len as usize
221    }
222
223    /// Returns true if this is empty
224    #[inline(always)]
225    pub fn is_empty(&self) -> bool {
226        self.len == 0
227    }
228
229    /// The capacity of this buffer, that can be written to
230    #[inline(always)]
231    pub fn cap(&self) -> usize {
232        self.storage.len() - self.len as usize
233    }
234
235    /// The size of the underlying storage (to know whether
236    /// to reallocate or grow)
237    pub fn storage_size(&self) -> usize {
238        self.storage.cap()
239    }
240
241    /// Read at most `limit` bytes from [ReadOwned] into this buffer.
242    ///
243    /// This method takes ownership of `self` because it submits an io_uring
244    /// operation, where the kernel owns the read buffer - the only way to
245    /// gain ownership of `self` again is to complete the read operation.
246    ///
247    /// Panics if `cap` is zero
248    pub async fn read_into(
249        self,
250        limit: usize,
251        r: &mut impl ReadOwned,
252    ) -> (std::io::Result<usize>, Self) {
253        let read_cap = std::cmp::min(limit, self.cap());
254        assert!(read_cap > 0, "refusing to do empty read");
255        let read_off = self.len;
256
257        tracing::trace!(%read_off, %read_cap, storage = ?self.storage, len = %self.len, "read_into in progress...");
258        let read_into = ReadInto {
259            buf: self,
260            off: read_off,
261            cap: read_cap.try_into().unwrap(),
262        };
263        let (res, mut read_into) = r.read_owned(read_into).await;
264        if let Ok(n) = &res {
265            tracing::trace!("read_into got {} bytes", *n);
266            read_into.buf.len += *n as u32;
267        } else {
268            tracing::trace!("read_into failed: {:?}", res);
269        }
270        (res, read_into.buf)
271    }
272
273    /// Put a slice into this buffer, fails if the slice doesn't fit in the buffer's capacity
274    pub fn put(&mut self, s: impl AsRef<[u8]>) -> Result<()> {
275        let s = s.as_ref();
276
277        let len = s.len();
278        if len > self.cap() {
279            return Err(Error::DoesNotFit);
280        }
281        unsafe {
282            let ptr = self.storage.as_mut_ptr().add(self.len as usize);
283            std::ptr::copy_nonoverlapping(s.as_ptr(), ptr, len);
284        }
285        let u32_len: u32 = len.try_into().unwrap();
286        self.len += u32_len;
287        Ok(())
288    }
289
290    /// Put data into this RollMut with a closure. Panics if `len > self.cap()`
291    pub fn put_with<T>(&mut self, len: usize, f: impl FnOnce(&mut [u8]) -> Result<T>) -> Result<T> {
292        assert!(len <= self.cap());
293
294        let u32_len: u32 = len.try_into().unwrap();
295        let slice = unsafe {
296            std::slice::from_raw_parts_mut(self.storage.as_mut_ptr().add(self.len as usize), len)
297        };
298        let res = f(slice);
299        if res.is_ok() {
300            self.len += u32_len;
301        }
302        res
303    }
304
305    /// Assert that this RollMut isn't filled at all, reserve enough size to put
306    /// `len` bytes in it, then fill it with the given closure. If the closure
307    /// returns `Err`, it's as if the put never happened.
308    pub fn put_to_roll(
309        &mut self,
310        len: usize,
311        f: impl FnOnce(&mut [u8]) -> Result<()>,
312    ) -> Result<Roll> {
313        // TODO: this whole dance is a bit silly: the idea is to have a
314        // `RollMut` around that we use whenever we need to serialize something.
315        // it's weird that we need to do all this for it to happen but ah well.
316
317        assert_eq!(self.len(), 0);
318        self.reserve_at_least(len)?;
319        self.put_with(len, f)?;
320        let roll = self.take_all();
321        debug_assert_eq!(roll.len(), len);
322        Ok(roll)
323    }
324
325    /// Get a [Roll] corresponding to the filled portion of this buffer
326    pub fn filled(&self) -> Roll {
327        match &self.storage {
328            StorageMut::Buf(b) => b.freeze_slice(0..self.len()).into(),
329            StorageMut::Box(b) => RollBox {
330                b: b.clone(),
331                len: self.len,
332            }
333            .into(),
334        }
335    }
336
337    /// Split this [RollMut] at the given index.
338    /// Panics if `at > len()`. If `at < len()`, the filled portion will carry
339    /// over in the new [RollMut].
340    pub fn skip(&mut self, n: usize) {
341        let u32_n: u32 = n.try_into().unwrap();
342        assert!(u32_n <= self.len);
343
344        match &mut self.storage {
345            StorageMut::Buf(b) => b.skip(n),
346            StorageMut::Box(b) => b.off += u32_n,
347        }
348        self.len -= u32_n;
349    }
350
351    /// Takes the first `n` bytes (up to `len`) as a `Roll`, and advances
352    /// this buffer. Returns `None` if `len` is zero. Panics if `n` is
353    /// zero.
354    pub fn take_at_most(&mut self, n: usize) -> Option<Roll> {
355        assert!(n != 0, "refusing to do empty take_at_most");
356
357        if self.len == 0 {
358            return None;
359        }
360
361        let n = std::cmp::min(n, self.len as usize);
362        let roll = self.filled().slice(..n);
363        self.skip(n);
364        Some(roll)
365    }
366
367    /// Takes the whole `filled` part. Panics if empty, since this is probably
368    /// a misuse.
369    pub fn take_all(&mut self) -> Roll {
370        let roll = self.filled();
371        assert!(
372            !roll.is_empty(),
373            "take_all is pointless if the filled part is empty, check len first"
374        );
375        self.skip(roll.len());
376        roll
377    }
378
379    /// Advance this buffer, keeping the filled part only starting with
380    /// the given roll. (Useful after parsing something with nom)
381    ///
382    /// Panics if the roll is not from this buffer
383    pub fn keep(&mut self, roll: Roll) {
384        match (&mut self.storage, &roll.inner) {
385            (StorageMut::Buf(ours), RollInner::Buf(theirs)) => {
386                assert_eq!(ours.index, theirs.index, "roll must be from same buffer");
387                assert!(theirs.off >= ours.off, "roll must start within buffer");
388                let skipped = theirs.off - ours.off;
389                tracing::trace!(our_index = %ours.index, their_index = %theirs.index, our_off = %ours.off, their_off = %theirs.off, %skipped, "RollMut::keep");
390                self.len -= skipped as u32;
391                ours.len -= skipped;
392                ours.off = theirs.off;
393            }
394            (StorageMut::Box(ours), RollInner::Box(theirs)) => {
395                assert_eq!(
396                    ours.buf.get(),
397                    theirs.b.buf.get(),
398                    "roll must be from same buffer"
399                );
400                assert!(theirs.b.off >= ours.off, "roll must start within buffer");
401                let skipped = theirs.b.off - ours.off;
402                self.len -= skipped;
403                ours.off = theirs.b.off;
404            }
405            _ => {
406                panic!("roll must be from same buffer");
407            }
408        }
409    }
410}
411
412impl std::io::Write for RollMut {
413    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
414        let n = buf.len();
415        if self.cap() < n {
416            // TODO: this is wrong, `reserve` might not reserve _enough data_.
417            // we know how much data we need here, so we should reserve exactly
418            // that amount (rounded up so it aligns nicely with the default
419            // buffer size)
420            self.reserve()
421                .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
422        }
423        self.put(buf)
424            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
425        Ok(n)
426    }
427
428    fn flush(&mut self) -> std::io::Result<()> {
429        // no need to flush
430        Ok(())
431    }
432}
433
434impl Deref for RollMut {
435    type Target = [u8];
436
437    #[inline(always)]
438    fn deref(&self) -> &Self::Target {
439        match &self.storage {
440            StorageMut::Buf(b) => &b[..self.len as usize],
441            StorageMut::Box(b) => b.slice(self.len),
442        }
443    }
444}
445
446pub(crate) struct ReadInto {
447    buf: RollMut,
448    off: u32,
449    cap: u32,
450}
451
452unsafe impl IoBufMut for ReadInto {
453    fn io_buf_mut_stable_mut_ptr(&mut self) -> *mut u8 {
454        unsafe { self.buf.storage.as_mut_ptr().add(self.off as usize) }
455    }
456
457    fn io_buf_mut_capacity(&self) -> usize {
458        self.cap as _
459    }
460}
461
462/// An immutable view into a [RollMut]
463#[derive(Clone)]
464pub struct Roll {
465    inner: RollInner,
466}
467
468impl Debug for Roll {
469    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
470        Debug::fmt(&self[..], f)
471    }
472}
473
474impl<T> PartialEq<T> for Roll
475where
476    T: AsRef<[u8]>,
477{
478    fn eq(&self, other: &T) -> bool {
479        &self[..] == other.as_ref()
480    }
481}
482
483impl Eq for Roll {}
484
485impl From<RollInner> for Roll {
486    fn from(inner: RollInner) -> Self {
487        Self { inner }
488    }
489}
490
491impl From<RollBox> for Roll {
492    fn from(b: RollBox) -> Self {
493        RollInner::Box(b).into()
494    }
495}
496
497impl From<Buf> for Roll {
498    fn from(b: Buf) -> Self {
499        RollInner::Buf(b).into()
500    }
501}
502
503#[derive(Clone)]
504enum RollInner {
505    Buf(Buf),
506    Box(RollBox),
507    Empty,
508}
509
510#[derive(Clone)]
511struct RollBox {
512    b: BoxStorage,
513    len: u32,
514}
515
516impl RollBox {
517    #[inline(always)]
518    fn split_at(self, at: usize) -> (Self, Self) {
519        let at: u32 = at.try_into().unwrap();
520        assert!(at <= self.len);
521
522        let left = Self {
523            b: self.b.clone(),
524            len: at,
525        };
526
527        let mut right = Self {
528            b: self.b,
529            len: self.len - at,
530        };
531        right.b.off += at;
532
533        (left, right)
534    }
535
536    #[inline(always)]
537    fn len(&self) -> usize {
538        self.len as usize
539    }
540
541    fn slice(mut self, range: impl RangeBounds<usize>) -> Self {
542        let mut new_start = 0;
543        let mut new_end = self.len();
544
545        match range.start_bound() {
546            Bound::Included(&n) => new_start = n,
547            Bound::Excluded(&n) => new_start = n + 1,
548            Bound::Unbounded => {}
549        }
550
551        match range.end_bound() {
552            Bound::Included(&n) => new_end = n + 1,
553            Bound::Excluded(&n) => new_end = n,
554            Bound::Unbounded => {}
555        }
556
557        assert!(new_start <= new_end);
558        assert!(new_end <= self.len());
559
560        self.b.off += new_start as u32;
561        self.len = (new_end - new_start) as u32;
562        self
563    }
564}
565
566impl AsRef<[u8]> for RollBox {
567    #[inline(always)]
568    fn as_ref(&self) -> &[u8] {
569        self.b.slice(self.len)
570    }
571}
572
573impl AsRef<[u8]> for Roll {
574    #[inline(always)]
575    fn as_ref(&self) -> &[u8] {
576        match &self.inner {
577            RollInner::Buf(b) => b.as_ref(),
578            RollInner::Box(b) => b.as_ref(),
579            RollInner::Empty => &[],
580        }
581    }
582}
583
584impl Deref for Roll {
585    type Target = [u8];
586
587    #[inline(always)]
588    fn deref(&self) -> &Self::Target {
589        self.as_ref()
590    }
591}
592
593impl RollInner {
594    #[inline(always)]
595    fn split_at(self, at: usize) -> (Self, Self) {
596        match self {
597            RollInner::Buf(b) => {
598                let (left, right) = b.split_at(at);
599                (RollInner::Buf(left), RollInner::Buf(right))
600            }
601            RollInner::Box(b) => {
602                let (left, right) = b.split_at(at);
603                (RollInner::Box(left), RollInner::Box(right))
604            }
605            RollInner::Empty => {
606                assert_eq!(at, 0);
607                (RollInner::Empty, RollInner::Empty)
608            }
609        }
610    }
611}
612
613impl Roll {
614    /// Creates an empty roll
615    pub fn empty() -> Self {
616        RollInner::Empty.into()
617    }
618
619    /// Returns the length of this roll
620    #[inline(always)]
621    pub fn len(&self) -> usize {
622        match &self.inner {
623            RollInner::Buf(b) => b.len(),
624            RollInner::Box(b) => b.len(),
625            RollInner::Empty => 0,
626        }
627    }
628
629    /// Returns true if this roll is empty
630    pub fn is_empty(&self) -> bool {
631        self.len() == 0
632    }
633
634    pub fn split_at(self, at: usize) -> (Roll, Roll) {
635        let (left, right) = self.inner.split_at(at);
636        (left.into(), right.into())
637    }
638
639    pub fn slice(self, range: impl RangeBounds<usize>) -> Self {
640        match self.inner {
641            RollInner::Buf(b) => b.slice(range).into(),
642            RollInner::Box(b) => b.slice(range).into(),
643            RollInner::Empty => panic!("cannot slice empty roll"),
644        }
645    }
646
647    pub fn iter(&self) -> RollIter {
648        RollIter {
649            roll: self.clone(),
650            pos: 0,
651        }
652    }
653
654    pub fn to_string_lossy(&self) -> Cow<'_, str> {
655        String::from_utf8_lossy(self)
656    }
657
658    /// Decode as utf-8
659    pub fn to_string(self) -> Result<RollStr, Utf8Error> {
660        _ = std::str::from_utf8(&self)?;
661        Ok(RollStr { roll: self })
662    }
663
664    /// Convert to [RollStr].
665    ///
666    /// # Safety
667    /// UB if not utf-8. Typically only used in parsers.
668    pub unsafe fn to_string_unchecked(self) -> RollStr {
669        RollStr { roll: self }
670    }
671}
672
673impl InputIter for Roll {
674    type Item = u8;
675    type Iter = Enumerate<Self::IterElem>;
676    type IterElem = RollIter;
677
678    #[inline]
679    fn iter_indices(&self) -> Self::Iter {
680        self.iter_elements().enumerate()
681    }
682    #[inline]
683    fn iter_elements(&self) -> Self::IterElem {
684        self.iter()
685    }
686    #[inline]
687    fn position<P>(&self, predicate: P) -> Option<usize>
688    where
689        P: Fn(Self::Item) -> bool,
690    {
691        self.iter().position(predicate)
692    }
693    #[inline]
694    fn slice_index(&self, count: usize) -> Result<usize, Needed> {
695        if self.len() >= count {
696            Ok(count)
697        } else {
698            Err(Needed::new(count - self.len()))
699        }
700    }
701}
702
703/// An iterator over [Roll]
704pub struct RollIter {
705    roll: Roll,
706    pos: usize,
707}
708
709impl Iterator for RollIter {
710    type Item = u8;
711
712    fn next(&mut self) -> Option<Self::Item> {
713        if self.pos >= self.roll.len() {
714            return None;
715        }
716
717        let c = self.roll[self.pos];
718        self.pos += 1;
719        Some(c)
720    }
721
722    fn size_hint(&self) -> (usize, Option<usize>) {
723        let remaining = self.roll.len() - self.pos;
724        (remaining, Some(remaining))
725    }
726}
727
728impl InputTake for Roll {
729    #[inline]
730    fn take(&self, count: usize) -> Self {
731        self.clone().slice(..count)
732    }
733    #[inline]
734    fn take_split(&self, count: usize) -> (Self, Self) {
735        let (prefix, suffix) = self.clone().split_at(count);
736        (suffix, prefix)
737    }
738}
739
740impl InputTakeAtPosition for Roll {
741    type Item = u8;
742
743    fn split_at_position<P, E: nom::error::ParseError<Self>>(
744        &self,
745        predicate: P,
746    ) -> nom::IResult<Self, Self, E>
747    where
748        P: Fn(Self::Item) -> bool,
749    {
750        match self.iter().position(predicate) {
751            Some(i) => Ok(self.clone().take_split(i)),
752            None => Err(nom::Err::Incomplete(nom::Needed::new(1))),
753        }
754    }
755
756    fn split_at_position1<P, E: nom::error::ParseError<Self>>(
757        &self,
758        predicate: P,
759        e: nom::error::ErrorKind,
760    ) -> nom::IResult<Self, Self, E>
761    where
762        P: Fn(Self::Item) -> bool,
763    {
764        match self.iter().position(predicate) {
765            Some(0) => Err(nom::Err::Error(E::from_error_kind(self.clone(), e))),
766            Some(i) => Ok(self.take_split(i)),
767            None => Err(nom::Err::Incomplete(nom::Needed::new(1))),
768        }
769    }
770
771    fn split_at_position_complete<P, E: nom::error::ParseError<Self>>(
772        &self,
773        predicate: P,
774    ) -> nom::IResult<Self, Self, E>
775    where
776        P: Fn(Self::Item) -> bool,
777    {
778        match self.iter().position(predicate) {
779            Some(i) => Ok(self.take_split(i)),
780            None => Ok(self.take_split(self.input_len())),
781        }
782    }
783
784    fn split_at_position1_complete<P, E: nom::error::ParseError<Self>>(
785        &self,
786        predicate: P,
787        e: nom::error::ErrorKind,
788    ) -> nom::IResult<Self, Self, E>
789    where
790        P: Fn(Self::Item) -> bool,
791    {
792        match self.iter().position(predicate) {
793            Some(0) => Err(nom::Err::Error(E::from_error_kind(self.clone(), e))),
794            Some(i) => Ok(self.take_split(i)),
795            None => {
796                if self.is_empty() {
797                    Err(nom::Err::Error(E::from_error_kind(self.clone(), e)))
798                } else {
799                    Ok(self.take_split(self.input_len()))
800                }
801            }
802        }
803    }
804}
805
806impl FindSubstring<&[u8]> for Roll {
807    fn find_substring(&self, substr: &[u8]) -> Option<usize> {
808        if substr.len() > self.len() {
809            return None;
810        }
811
812        let (&substr_first, substr_rest) = match substr.split_first() {
813            Some(split) => split,
814            // an empty substring is found at position 0
815            // This matches the behavior of str.find("").
816            None => return Some(0),
817        };
818
819        if substr_rest.is_empty() {
820            return memchr::memchr(substr_first, self);
821        }
822
823        let mut offset = 0;
824        let haystack = &self[..self.len() - substr_rest.len()];
825
826        while let Some(position) = memchr::memchr(substr_first, &haystack[offset..]) {
827            offset += position;
828            let next_offset = offset + 1;
829            if &self[next_offset..][..substr_rest.len()] == substr_rest {
830                return Some(offset);
831            }
832
833            offset = next_offset;
834        }
835
836        None
837    }
838}
839
840impl Compare<&[u8]> for Roll {
841    #[inline(always)]
842    fn compare(&self, t: &[u8]) -> CompareResult {
843        let pos = self.iter().zip(t.iter()).position(|(a, b)| a != *b);
844
845        match pos {
846            Some(_) => CompareResult::Error,
847            None => {
848                if self.len() >= t.len() {
849                    CompareResult::Ok
850                } else {
851                    CompareResult::Incomplete
852                }
853            }
854        }
855    }
856
857    #[inline(always)]
858    fn compare_no_case(&self, t: &[u8]) -> CompareResult {
859        if self
860            .iter()
861            .zip(t)
862            .any(|(a, b)| lowercase_byte(a) != lowercase_byte(*b))
863        {
864            CompareResult::Error
865        } else if self.len() < t.len() {
866            CompareResult::Incomplete
867        } else {
868            CompareResult::Ok
869        }
870    }
871}
872
873fn lowercase_byte(c: u8) -> u8 {
874    match c {
875        b'A'..=b'Z' => c - b'A' + b'a',
876        _ => c,
877    }
878}
879
880impl InputLength for Roll {
881    #[inline]
882    fn input_len(&self) -> usize {
883        self.len()
884    }
885}
886
887impl<S> Slice<S> for Roll
888where
889    S: RangeBounds<usize>,
890{
891    fn slice(&self, range: S) -> Self {
892        Roll::slice(self.clone(), range)
893    }
894}
895
896/// A [Roll] that's also a valid utf-8 string.
897#[derive(Clone)]
898pub struct RollStr {
899    roll: Roll,
900}
901
902impl Debug for RollStr {
903    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
904        Debug::fmt(&self[..], f)
905    }
906}
907
908impl Deref for RollStr {
909    type Target = str;
910
911    fn deref(&self) -> &Self::Target {
912        unsafe { std::str::from_utf8_unchecked(&self.roll) }
913    }
914}
915
916impl RollStr {
917    pub fn into_inner(self) -> Roll {
918        self.roll
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use nom::IResult;
925    use tracing::trace;
926
927    use crate::{Roll, RollMut, BUF_SIZE};
928
929    #[test]
930    fn test_roll_put() {
931        fn test_roll_put_inner(mut rm: RollMut) {
932            let initial_size = rm.cap();
933
934            rm.put(b"hello").unwrap();
935            assert_eq!(rm.cap(), initial_size - 5);
936
937            let filled = rm.filled();
938            assert_eq!(&filled[..], b"hello");
939
940            rm.skip(5);
941            assert_eq!(rm.len(), 0);
942            assert_eq!(rm.cap(), initial_size - 5);
943        }
944
945        let rm = RollMut::alloc().unwrap();
946        assert_eq!(rm.cap(), BUF_SIZE as usize);
947        test_roll_put_inner(rm);
948
949        let mut rm = RollMut::alloc().unwrap();
950        rm.grow();
951        test_roll_put_inner(rm);
952
953        let mut rm = RollMut::alloc().unwrap();
954        rm.grow();
955        rm.grow();
956        test_roll_put_inner(rm);
957    }
958
959    #[test]
960    fn test_roll_put_does_not_fit() {
961        let mut rm = RollMut::alloc().unwrap();
962        rm.put(" ".repeat(rm.cap())).unwrap();
963
964        let err = rm.put("drop").unwrap_err();
965        assert!(format!("{err:?}").contains("DoesNotFit"));
966        assert!(format!("{err}").contains("does not fit"));
967    }
968
969    #[test]
970    fn test_roll_realloc() {
971        fn test_roll_realloc_inner(mut rm: RollMut) {
972            let init_cap = rm.cap();
973            rm.put("hello").unwrap();
974            rm.take_all();
975            assert_eq!(rm.cap(), init_cap - 5);
976
977            rm.realloc().unwrap();
978            assert_eq!(rm.cap(), BUF_SIZE as usize);
979        }
980
981        let rm = RollMut::alloc().unwrap();
982        test_roll_realloc_inner(rm);
983
984        let mut rm = RollMut::alloc().unwrap();
985        rm.grow();
986        test_roll_realloc_inner(rm);
987    }
988
989    #[test]
990    fn test_roll_realloc_big() {
991        let mut rm = RollMut::alloc().unwrap();
992        rm.grow();
993
994        let put = "x".repeat(rm.cap() * 2 / 3);
995        rm.put(&put).unwrap();
996        rm.realloc().unwrap();
997
998        assert_eq!(rm.storage_size(), BUF_SIZE as usize * 2);
999        assert_eq!(rm.len(), put.len());
1000        assert_eq!(&rm[..], put.as_bytes());
1001    }
1002
1003    #[test]
1004    fn test_roll_reserve() {
1005        let mut rm = RollMut::alloc().unwrap();
1006        assert_eq!(rm.cap(), BUF_SIZE as usize);
1007        assert_eq!(rm.len(), 0);
1008        rm.reserve().unwrap();
1009        assert_eq!(rm.cap(), BUF_SIZE as usize);
1010        assert_eq!(rm.len(), 0);
1011
1012        rm.put("hello").unwrap();
1013        rm.take_all();
1014
1015        assert_eq!(rm.cap(), BUF_SIZE as usize - 5);
1016        assert_eq!(rm.len(), 0);
1017        rm.reserve().unwrap();
1018        assert_eq!(rm.cap(), BUF_SIZE as usize - 5);
1019        assert_eq!(rm.len(), 0);
1020
1021        let old_cap = rm.cap();
1022        rm.put(b" ".repeat(old_cap)).unwrap();
1023        assert_eq!(rm.cap(), 0);
1024        assert_eq!(rm.len(), old_cap);
1025
1026        rm.reserve().unwrap();
1027        assert_eq!(rm.cap(), 5);
1028        assert_eq!(rm.len(), old_cap);
1029
1030        rm.put("hello").unwrap();
1031        rm.reserve().unwrap();
1032        assert_eq!(rm.cap(), BUF_SIZE as usize);
1033        assert_eq!(rm.len(), BUF_SIZE as usize);
1034    }
1035
1036    #[test]
1037    fn test_roll_put_then_grow() {
1038        let mut rm = RollMut::alloc().unwrap();
1039        assert_eq!(rm.cap(), BUF_SIZE as usize);
1040
1041        let input = b"I am pretty long";
1042
1043        rm.put(input).unwrap();
1044        assert_eq!(rm.len(), input.len());
1045        assert_eq!(&rm[..], input);
1046
1047        assert_eq!(rm.cap(), BUF_SIZE as usize - input.len());
1048
1049        rm.grow();
1050        assert_eq!(rm.cap(), 2 * (BUF_SIZE as usize) - input.len());
1051        assert_eq!(&rm[..], input);
1052
1053        rm.skip(5);
1054        assert_eq!(&rm[..], b"pretty long");
1055    }
1056
1057    #[test]
1058    #[cfg(not(feature = "miri"))]
1059    fn test_roll_readfrom_start() {
1060        use crate::WriteOwned;
1061
1062        crate::start(async move {
1063            let mut rm = RollMut::alloc().unwrap();
1064
1065            let (mut send, mut read) = crate::pipe();
1066            crate::spawn(async move {
1067                send.write_all_owned("123456").await.unwrap();
1068            });
1069
1070            let mut res;
1071            (res, rm) = rm.read_into(3, &mut read).await;
1072            res.unwrap();
1073
1074            assert_eq!(rm.len(), 3);
1075            assert_eq!(rm.filled().as_ref(), b"123");
1076
1077            (res, rm) = rm.read_into(3, &mut read).await;
1078            res.unwrap();
1079
1080            assert_eq!(rm.len(), 6);
1081            assert_eq!(rm.filled().as_ref(), b"123456");
1082        });
1083    }
1084
1085    #[test]
1086    fn test_roll_keep() {
1087        fn test_roll_keep_inner(mut rm: RollMut) {
1088            rm.put(b"helloworld").unwrap();
1089            assert_eq!(&rm[..], b"helloworld");
1090
1091            {
1092                let roll = rm.filled().slice(3..=5);
1093                assert_eq!(roll, b"low");
1094            }
1095
1096            {
1097                let roll = rm.filled().slice(..5);
1098                assert_eq!(roll, b"hello");
1099            }
1100
1101            let roll = rm.filled().slice(5..);
1102            assert_eq!(roll, b"world");
1103
1104            rm.keep(roll);
1105            assert_eq!(&rm[..], b"world");
1106        }
1107
1108        let rm = RollMut::alloc().unwrap();
1109        test_roll_keep_inner(rm);
1110
1111        let mut rm = RollMut::alloc().unwrap();
1112        rm.grow();
1113        test_roll_keep_inner(rm);
1114    }
1115
1116    #[test]
1117    #[should_panic(expected = "roll must be from same buffer")]
1118    fn test_roll_keep_different_buf() {
1119        let mut rm1 = RollMut::alloc().unwrap();
1120        rm1.put("hello").unwrap();
1121
1122        let mut rm2 = RollMut::alloc().unwrap();
1123        rm2.put("hello").unwrap();
1124        let roll2 = rm2.take_all();
1125
1126        rm1.keep(roll2);
1127    }
1128
1129    #[test]
1130    #[should_panic(expected = "roll must be from same buffer")]
1131    fn test_roll_keep_different_box() {
1132        let mut rm1 = RollMut::alloc().unwrap();
1133        rm1.grow();
1134        rm1.put("hello").unwrap();
1135
1136        let mut rm2 = RollMut::alloc().unwrap();
1137        rm2.grow();
1138        rm2.put("hello").unwrap();
1139        let roll2 = rm2.take_all();
1140
1141        rm1.keep(roll2);
1142    }
1143
1144    #[test]
1145    #[should_panic(expected = "roll must be from same buffer")]
1146    fn test_roll_keep_different_type() {
1147        let mut rm1 = RollMut::alloc().unwrap();
1148        rm1.grow();
1149        rm1.put("hello").unwrap();
1150
1151        let mut rm2 = RollMut::alloc().unwrap();
1152        rm2.put("hello").unwrap();
1153        let roll2 = rm2.take_all();
1154
1155        rm1.keep(roll2);
1156    }
1157
1158    #[test]
1159    #[should_panic(expected = "roll must start within buffer")]
1160    fn test_roll_keep_before_buf() {
1161        let mut rm1 = RollMut::alloc().unwrap();
1162        rm1.put("hello").unwrap();
1163        let roll = rm1.filled();
1164        rm1.skip(5);
1165        rm1.keep(roll);
1166    }
1167
1168    #[test]
1169    #[should_panic(expected = "roll must start within buffer")]
1170    fn test_roll_keep_before_box() {
1171        let mut rm1 = RollMut::alloc().unwrap();
1172        rm1.grow();
1173        rm1.put("hello").unwrap();
1174        let roll = rm1.filled();
1175        rm1.skip(5);
1176        rm1.keep(roll);
1177    }
1178
1179    #[test]
1180    fn test_roll_iter() {
1181        let mut rm = RollMut::alloc().unwrap();
1182        rm.put(b"hello").unwrap();
1183        let roll = rm.filled();
1184        let v = roll.iter().collect::<Vec<_>>();
1185        assert_eq!(v, b"hello");
1186
1187        assert_eq!(roll.to_string_lossy(), "hello");
1188    }
1189
1190    #[test]
1191    #[cfg(not(feature = "miri"))]
1192    fn test_roll_iobuf() {
1193        use crate::{
1194            io::{IntoHalves, ReadOwned, WriteOwned},
1195            net::{TcpListener, TcpStream},
1196        };
1197
1198        async fn test_roll_iobuf_inner(mut rm: RollMut) -> eyre::Result<()> {
1199            rm.put(b"hello").unwrap();
1200            let roll = rm.take_all();
1201
1202            let ln = TcpListener::bind("127.0.0.1:0".parse()?).await?;
1203            let local_addr = ln.local_addr()?;
1204
1205            let send_fut = async move {
1206                let stream = TcpStream::connect(local_addr).await?;
1207                let (_stream_r, mut stream_w) = IntoHalves::into_halves(stream);
1208                stream_w.write_all_owned(roll).await?;
1209                Ok::<_, eyre::Report>(())
1210            };
1211
1212            let recv_fut = async move {
1213                let (stream, addr) = ln.accept().await?;
1214                let (mut stream_r, _stream_w) = IntoHalves::into_halves(stream);
1215                println!("Accepted connection from {addr}");
1216
1217                let mut buf = vec![0u8; 1024];
1218                let res;
1219                (res, buf) = stream_r.read_owned(buf).await;
1220                let n = res?;
1221
1222                assert_eq!(&buf[..n], b"hello");
1223
1224                Ok::<_, eyre::Report>(())
1225            };
1226
1227            tokio::try_join!(send_fut, recv_fut)?;
1228            Ok(())
1229        }
1230
1231        crate::start(async move {
1232            let rm = RollMut::alloc().unwrap();
1233            test_roll_iobuf_inner(rm).await.unwrap();
1234
1235            let mut rm = RollMut::alloc().unwrap();
1236            rm.grow();
1237            test_roll_iobuf_inner(rm).await.unwrap();
1238        });
1239    }
1240
1241    #[test]
1242    fn test_roll_take_at_most() {
1243        let mut rm = RollMut::alloc().unwrap();
1244        rm.put(b"hello").unwrap();
1245        let roll = rm.take_at_most(4).unwrap();
1246        assert_eq!(roll, b"hell");
1247
1248        let mut rm = RollMut::alloc().unwrap();
1249        rm.put(b"hello").unwrap();
1250        let roll = rm.take_at_most(12).unwrap();
1251        assert_eq!(roll, b"hello");
1252
1253        let mut rm = RollMut::alloc().unwrap();
1254        assert!(rm.take_at_most(12).is_none());
1255    }
1256
1257    #[test]
1258    fn test_roll_take_all() {
1259        let mut rm = RollMut::alloc().unwrap();
1260        rm.put(b"hello").unwrap();
1261        let roll = rm.take_all();
1262        assert_eq!(roll, b"hello");
1263    }
1264
1265    #[test]
1266    #[should_panic(expected = "take_all is pointless if the filled part is empty")]
1267    fn test_roll_take_all_empty() {
1268        let mut rm = RollMut::alloc().unwrap();
1269        rm.take_all();
1270    }
1271
1272    #[test]
1273    #[should_panic(expected = "refusing to do empty take_at_most")]
1274    fn test_roll_take_at_most_panic() {
1275        let mut rm = RollMut::alloc().unwrap();
1276        rm.take_at_most(0);
1277    }
1278
1279    #[test]
1280    fn test_roll_nom_sample() {
1281        fn parse(i: Roll) -> IResult<Roll, Roll> {
1282            nom::bytes::streaming::tag(&b"HTTP/1.1 200 OK"[..])(i)
1283        }
1284
1285        let mut buf = RollMut::alloc().unwrap();
1286
1287        let input = b"HTTP/1.1 200 OK".repeat(1000);
1288        let mut pending = &input[..];
1289
1290        loop {
1291            if buf.cap() == 0 {
1292                trace!("buf had zero cap, growing");
1293                buf.grow()
1294            }
1295
1296            let (rest, version) = match parse(buf.filled()) {
1297                Ok(t) => t,
1298                Err(e) => {
1299                    if e.is_incomplete() {
1300                        {
1301                            if pending.is_empty() {
1302                                println!("ran out of input");
1303                                break;
1304                            }
1305
1306                            let n = std::cmp::min(buf.cap(), pending.len());
1307                            buf.put(&pending[..n]).unwrap();
1308                            pending = &pending[n..];
1309
1310                            println!("advanced by {n}, {} remaining", pending.len());
1311                        }
1312
1313                        continue;
1314                    }
1315                    panic!("parsing error: {e}");
1316                }
1317            };
1318            assert_eq!(version, b"HTTP/1.1 200 OK");
1319
1320            buf.keep(rest);
1321        }
1322    }
1323
1324    #[test]
1325    fn test_roll_io_write() {
1326        let mut rm = RollMut::alloc().unwrap();
1327        std::io::Write::write_all(&mut rm, b"hello").unwrap();
1328
1329        let roll = rm.take_all();
1330        assert_eq!(std::str::from_utf8(&roll).unwrap(), "hello");
1331    }
1332}