xparse 0.1.10

A parser combinator that is fully statically dispatched and supports both sync & async parsing.
Documentation
#[cfg(feature = "async")]
use {
    crate::Error,
    alloc::collections::VecDeque,
    core::{
        future::{poll_fn, Future},
        pin::Pin,
        task::{Context, Poll},
    },
    futures_core::Stream,
};

use crate::Result;

#[cfg(not(feature = "async"))]
pub fn from_slice<T>(slice: &[T]) -> impl Source<Item = T> + '_ {
    OwnedSource {
        position: 0,
        r#impl: slice,
    }
}

#[cfg(feature = "async")]
pub fn from_slice<T>(slice: &[T]) -> impl Source<Item = T> + AsyncSource<Item = T> + '_ {
    OwnedSource {
        position: 0,
        r#impl: slice,
    }
}

#[cfg(feature = "async")]
pub fn form_stream<S: Stream + Unpin>(stream: S) -> impl AsyncSource<Item = S::Item> {
    OwnedSource {
        position: 0,
        r#impl: BufferedStream {
            buffer: VecDeque::new(),
            stream: AsResult(stream),
        },
    }
}

#[cfg(feature = "async")]
struct AsResult<T>(T);

#[cfg(feature = "async")]
impl<T: Stream> Stream for AsResult<T> {
    type Item = Result<T::Item>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }
            .poll_next(cx)
            .map(|x| x.map(Ok))
    }
}

#[cfg(feature = "async")]
pub fn form_try_stream<S: Stream<Item = Result<I, E>> + Unpin, I, E: Into<Error>>(
    stream: S,
) -> impl AsyncSource<Item = I> {
    OwnedSource {
        position: 0,
        r#impl: BufferedStream {
            buffer: VecDeque::new(),
            stream,
        },
    }
}

pub trait SourceBase {
    type Item;
    fn consume(&mut self, len: usize);
    fn position(&self) -> usize;
    fn join(self);
}

pub trait Source: SourceBase {
    fn fork(&mut self) -> impl Source<Item = Self::Item>;
    fn read(&mut self, len: usize) -> Result<&[Self::Item]>;
}

#[cfg(feature = "async")]
pub trait AsyncSource: SourceBase {
    fn fork(&mut self) -> impl AsyncSource<Item = Self::Item>;
    fn read(&mut self, len: usize) -> impl Future<Output = Result<&[Self::Item]>>;
}

struct OwnedSource<T> {
    r#impl: T,
    position: usize,
}

impl<T: SourceImplBase> SourceBase for OwnedSource<T> {
    type Item = T::Item;

    #[inline(always)]
    fn consume(&mut self, len: usize) {
        debug_assert!(len <= self.r#impl.available());
        self.position += len;
        self.r#impl.consume(len);
    }

    #[inline(always)]
    fn position(&self) -> usize {
        self.position
    }

    #[inline(always)]
    fn join(self) {}
}

impl<T: SourceImpl> Source for OwnedSource<T> {
    #[inline(always)]
    fn fork(&mut self) -> impl Source<Item = Self::Item> {
        SourceRef {
            target: self,
            parent: None,
            offset: 0,
        }
    }

    #[inline(always)]
    fn read(&mut self, len: usize) -> Result<&[Self::Item]> {
        self.r#impl.read(len)
    }
}

#[cfg(feature = "async")]
impl<T: AsyncSourceImpl> AsyncSource for OwnedSource<T> {
    #[inline(always)]
    fn fork(&mut self) -> impl AsyncSource<Item = Self::Item> {
        SourceRef {
            target: self,
            parent: None,
            offset: 0,
        }
    }

    #[inline(always)]
    fn read(&mut self, len: usize) -> impl Future<Output = Result<&[Self::Item]>> {
        self.r#impl.read(len)
    }
}

struct SourceRef<'a, T> {
    target: &'a mut OwnedSource<T>,
    parent: Option<&'a mut usize>,
    offset: usize,
}

impl<T: SourceImplBase> SourceBase for SourceRef<'_, T> {
    type Item = T::Item;

    #[inline(always)]
    fn consume(&mut self, len: usize) {
        debug_assert!(self.offset + len <= self.target.r#impl.available());
        self.offset += len;
    }

    #[inline(always)]
    fn position(&self) -> usize {
        self.target.position + self.offset
    }

    #[inline(always)]
    fn join(self) {
        if let Some(parent) = self.parent {
            *parent = self.offset
        } else {
            self.target.consume(self.offset);
        }
    }
}

impl<T: SourceImpl> Source for SourceRef<'_, T> {
    #[inline(always)]
    fn fork(&mut self) -> impl Source<Item = Self::Item> {
        SourceRef {
            target: self.target,
            offset: self.offset,
            parent: Some(&mut self.offset),
        }
    }

    #[inline(always)]
    fn read(&mut self, len: usize) -> Result<&[Self::Item]> {
        Ok(&self.target.r#impl.read(self.offset + len)?[self.offset..])
    }
}

#[cfg(feature = "async")]
impl<T: AsyncSourceImpl> AsyncSource for SourceRef<'_, T> {
    #[inline(always)]
    fn fork(&mut self) -> impl AsyncSource<Item = Self::Item> {
        SourceRef {
            target: self.target,
            offset: self.offset,
            parent: Some(&mut self.offset),
        }
    }

    #[inline(always)]
    async fn read(&mut self, len: usize) -> Result<&[Self::Item]> {
        Ok(&self.target.r#impl.read(self.offset + len).await?[self.offset..])
    }
}

trait SourceImplBase {
    type Item;
    fn consume(&mut self, len: usize);
    fn available(&self) -> usize;
}

trait SourceImpl: SourceImplBase {
    fn read(&mut self, len: usize) -> Result<&[Self::Item]>;
}

#[cfg(feature = "async")]
trait AsyncSourceImpl: SourceImplBase {
    fn read(&mut self, len: usize) -> impl Future<Output = Result<&[Self::Item]>>;
}

#[cfg(feature = "async")]
impl<T: SourceImpl + ?Sized> AsyncSourceImpl for T {
    #[inline(always)]
    async fn read(&mut self, len: usize) -> Result<&[Self::Item]> {
        SourceImpl::read(self, len)
    }
}

#[cfg(feature = "async")]
struct BufferedStream<S: Stream<Item = Result<I, E>> + ?Sized, I, E: Into<Error>> {
    buffer: VecDeque<I>,
    stream: S,
}

#[cfg(feature = "async")]
impl<S: Stream<Item = Result<I, E>> + Unpin + ?Sized, I, E: Into<Error>> SourceImplBase
    for BufferedStream<S, I, E>
{
    type Item = I;

    #[inline]
    fn consume(&mut self, len: usize) {
        debug_assert!(
            self.buffer.len() >= len,
            "consume failed, the current buffer length is lower than {len}"
        );
        for _ in 0..len {
            self.buffer.pop_front();
        }
    }

    #[inline]
    fn available(&self) -> usize {
        self.buffer.len()
    }
}

#[cfg(feature = "async")]
impl<S: Stream<Item = Result<I, E>> + Unpin + ?Sized, I, E: Into<Error>> AsyncSourceImpl
    for BufferedStream<S, I, E>
{
    fn read(&mut self, len: usize) -> impl Future<Output = Result<&[I]>> {
        #[cfg(debug_assertions)]
        let mut is_complete = false;
        poll_fn(move |cx| {
            #[cfg(debug_assertions)]
            if is_complete {
                unreachable!("read future polled after completion")
            }

            while self.buffer.len() < len {
                match Pin::new(&mut self.stream).poll_next(cx) {
                    Poll::Ready(Some(Ok(item))) => self.buffer.push_back(item),
                    Poll::Ready(Some(Err(e))) => return Poll::Ready(Err(e.into())),
                    Poll::Ready(None) => {
                        #[cfg(debug_assertions)]
                        {
                            is_complete = true;
                        }
                        return Poll::Ready(Ok(unsafe {
                            &*(self.buffer.make_contiguous() as *const _)
                        }));
                    }
                    Poll::Pending => return Poll::Pending,
                };
            }

            #[cfg(debug_assertions)]
            {
                is_complete = true;
            }
            Poll::Ready(Ok(unsafe {
                &*((&self.buffer.make_contiguous()[..len]) as *const _)
            }))
        })
    }
}

impl<T> SourceImplBase for &[T] {
    type Item = T;

    fn consume(&mut self, len: usize) {
        *self = &self[len..]
    }

    fn available(&self) -> usize {
        self.len()
    }
}

impl<T> SourceImpl for &[T] {
    fn read(&mut self, len: usize) -> Result<&[Self::Item]> {
        Ok(&self[..len.min(self.len())])
    }
}

#[cfg(test)]
mod test {
    use super::{from_slice, Source, SourceBase};

    #[test]
    fn read_test() {
        let mut source = from_slice(b"01234567");
        let mut fork0 = source.fork();
        assert_eq!(fork0.read(9).unwrap(), b"01234567");
        let mut fork1 = fork0.fork();
        fork1.consume(3);
        fork1.join();
        let mut fork = fork0.fork();
        assert_eq!(fork.read(3).unwrap(), b"345");
        fork.consume(4);
        drop(fork);
        fork0.consume(3);
        assert_eq!(fork0.read(3).unwrap(), b"67");
        fork0.join();
        assert_eq!(source.read(3).unwrap(), b"67");
        source.consume(2);
        assert_eq!(source.read(5).unwrap(), []);
    }
}