nom_async/
stream.rs

1use futures::{
2    prelude::*,
3    task::{Context, Poll},
4};
5use nom::{Err, IResult};
6use std::{borrow::Borrow, convert::AsRef, ops::AddAssign, pin::Pin};
7
8/// A [Stream](futures::stream::Stream) constructed from a nom streaming parser
9pub struct NomStream<'a, I, O, T, E, S>
10where
11    I: ?Sized + ToOwned,
12    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I>,
13    T: AsRef<I>,
14    S: Stream<Item = Result<T, E>>,
15{
16    stream: S,
17    parser: Box<dyn 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>>,
18    buffer: <I as ToOwned>::Owned,
19}
20
21impl<'a, I, O, T, E, S> NomStream<'a, I, O, T, E, S>
22where
23    I: ?Sized + ToOwned,
24    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I>,
25    T: AsRef<I>,
26    S: Stream<Item = Result<T, E>>,
27{
28    /// Construct a new [NomStream] from a stream and parser
29    pub fn new<F>(stream: S, parser: F) -> Self
30    where
31        F: 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>,
32        <I as ToOwned>::Owned: Default,
33    {
34        let buffer = Default::default();
35        Self::new_with_buffer(stream, parser, buffer)
36    }
37
38    /// Construct a new [NomStream] from a stream, parser, and buffer
39    pub fn new_with_buffer<F>(stream: S, parser: F, buffer: <I as ToOwned>::Owned) -> Self
40    where
41        F: 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>,
42    {
43        let parser = Box::new(parser) as Box<_>;
44        NomStream { stream, parser, buffer }
45    }
46}
47
48impl<'a, I, O, T, E, S> Stream for NomStream<'a, I, O, T, E, S>
49where
50    I: ?Sized + ToOwned,
51    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I> + Unpin,
52    T: AsRef<I>,
53    S: Stream<Item = Result<T, E>> + Unpin,
54{
55    type Item = O;
56
57    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<O>> {
58        match (self.parser)(self.buffer.borrow()) {
59            Ok((i, o)) => {
60                self.buffer = i.to_owned();
61                cx.waker().clone().wake();
62                Poll::Ready(Some(o))
63            },
64            Err(err) => match err {
65                Err::Incomplete(_needed) => match Pin::new(&mut self.stream).poll_next(cx) {
66                    Poll::Ready(None) => Poll::Ready(None),
67                    Poll::Ready(Some(res)) => match res {
68                        Ok(item) => {
69                            self.buffer += item.as_ref();
70                            cx.waker().clone().wake();
71                            Poll::Pending
72                        },
73                        Err(_err) => Poll::Ready(None),
74                    },
75                    Poll::Pending => Poll::Pending,
76                },
77                Err::Error(_error) => Poll::Ready(None),
78                Err::Failure(_failure) => Poll::Ready(None),
79            },
80        }
81    }
82}