Skip to main content

streamparser/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3#[cfg(feature = "alloc")]
4extern crate alloc;
5
6use core::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12pub mod iter;
13#[cfg(feature = "nom-adapters")]
14pub mod nom;
15pub mod read;
16pub mod utf8;
17
18#[derive(Debug, PartialEq, Eq, Hash)]
19pub enum Streaming<T> {
20    Item(T),
21    Incomplete,
22}
23
24pub type Parsed<T> = (Option<T>, usize);
25
26pub trait Buffer {
27    type Output: ?Sized;
28
29    fn buffer(&mut self) -> (&Self::Output, &mut usize);
30    fn exhausted(&self) -> bool;
31}
32
33pub trait Advance {
34    type Error;
35
36    fn advance(&mut self) -> Result<(), Self::Error>;
37}
38
39pub trait AdvanceAsync {
40    type Error;
41
42    fn poll_advance(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>>;
43}
44
45pub trait Parse<'i> {
46    type Input: 'i;
47    type Output;
48    type Error;
49
50    fn parse(&mut self, input: Self::Input)
51        -> Result<Streaming<Parsed<Self::Output>>, Self::Error>;
52
53    fn parse_eof(&mut self, input: Self::Input) -> Result<Parsed<Self::Output>, Self::Error>;
54}
55
56#[derive(Debug)]
57#[must_use = "futures do nothing unless you `.await` or poll them"]
58struct AdvanceAsyncFuture<'a, B: ?Sized>(&'a mut B);
59
60impl<B> Future for AdvanceAsyncFuture<'_, B>
61where
62    B: ?Sized + AdvanceAsync + Unpin,
63{
64    type Output = Result<(), B::Error>;
65
66    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
67        let this = &mut *self;
68        Pin::new(&mut *this.0).poll_advance(cx)
69    }
70}
71
72pub(crate) fn advance_async<B>(src: &mut B) -> impl Future<Output = Result<(), B::Error>> + '_
73where
74    B: ?Sized + AdvanceAsync + Unpin,
75{
76    AdvanceAsyncFuture(src)
77}
78
79pub struct StreamParser<P, S> {
80    parser: P,
81    source: S,
82}
83
84impl<P, S> StreamParser<P, S> {
85    pub fn new(parser: P, source: S) -> Self {
86        StreamParser { parser, source }
87    }
88
89    pub fn advance(&mut self) -> Result<(), S::Error>
90    where
91        S: Advance,
92    {
93        self.source.advance()
94    }
95
96    pub fn advance_async(&mut self) -> impl Future<Output = Result<(), S::Error>> + '_
97    where
98        S: AdvanceAsync + Unpin,
99    {
100        advance_async(&mut self.source)
101    }
102
103    pub fn get(&mut self) -> Result<Option<Streaming<<P as Parse>::Output>>, <P as Parse>::Error>
104    where
105        S: Buffer,
106        for<'i> P: Parse<'i, Input = &'i S::Output>,
107    {
108        let exhausted = self.source.exhausted();
109        let (input, pos) = self.source.buffer();
110        match self.parser.parse(input) {
111            Ok(Streaming::Item((output, parsed))) => {
112                *pos += parsed;
113                Ok(output.map(Streaming::Item))
114            }
115            Ok(Streaming::Incomplete) => {
116                if exhausted {
117                    match self.parser.parse_eof(input) {
118                        Ok((output, parsed)) => {
119                            *pos += parsed;
120                            Ok(output.map(Streaming::Item))
121                        }
122                        Err(err) => Err(err),
123                    }
124                } else {
125                    Ok(Some(Streaming::Incomplete))
126                }
127            }
128            Err(err) => Err(err),
129        }
130    }
131}
132
133#[cfg(all(doctest, feature = "std"))]
134doc_comment::doctest!("../README.md");
135
136#[cfg(test)]
137mod tests;