nom_async/
future.rs

1use futures::{
2    prelude::*,
3    task::{Context, Poll},
4};
5use nom::{Err, IResult};
6use std::{
7    borrow::{Borrow, ToOwned},
8    convert::AsRef,
9    ops::AddAssign,
10    pin::Pin,
11};
12
13/// A [Future](futures::future::Future) constructed from a nom streaming parser
14pub struct NomFuture<'a, I, O, T, E, S>
15where
16    I: ?Sized + ToOwned,
17    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I>,
18    T: AsRef<I>,
19    S: Stream<Item = Result<T, E>>,
20{
21    stream: S,
22    parser: Box<dyn 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>>,
23    buffer: <I as ToOwned>::Owned,
24}
25
26impl<'a, I, O, T, E, S> NomFuture<'a, I, O, T, E, S>
27where
28    I: ?Sized + ToOwned,
29    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I>,
30    T: AsRef<I>,
31    S: Stream<Item = Result<T, E>>,
32{
33    /// Construct a new [NomFuture] from a stream and parser
34    pub fn new<F>(stream: S, parser: F) -> Self
35    where
36        F: 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>,
37        <I as ToOwned>::Owned: Default,
38    {
39        let buffer = Default::default();
40        Self::new_with_buffer(stream, parser, buffer)
41    }
42
43    /// Construct a new [NomFuture] from a stream, parser, and buffer
44    pub fn new_with_buffer<F>(stream: S, parser: F, buffer: <I as ToOwned>::Owned) -> Self
45    where
46        F: 'a + for<'i> Fn(&'i I) -> IResult<&'i I, O>,
47    {
48        let parser = Box::new(parser) as Box<_>;
49        NomFuture { stream, parser, buffer }
50    }
51}
52
53impl<'a, I, O, T, E, S> Future for NomFuture<'a, I, O, T, E, S>
54where
55    I: ?Sized + ToOwned,
56    <I as ToOwned>::Owned: for<'i> AddAssign<&'i I> + Unpin,
57    T: AsRef<I>,
58    S: Stream<Item = Result<T, E>> + Unpin,
59{
60    type Output = O;
61
62    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<O> {
63        match (self.parser)(self.buffer.borrow()) {
64            Ok((i, o)) => {
65                self.buffer = i.to_owned();
66                cx.waker().clone().wake();
67                Poll::Ready(o)
68            },
69            Err(err) => match err {
70                Err::Incomplete(_needed) => match Pin::new(&mut self.stream).poll_next(cx) {
71                    Poll::Ready(None) => panic!(),
72                    Poll::Ready(Some(res)) => match res {
73                        Ok(item) => {
74                            self.buffer += item.as_ref();
75                            cx.waker().clone().wake();
76                            Poll::Pending
77                        },
78                        Err(_err) => panic!(),
79                    },
80                    Poll::Pending => Poll::Pending,
81                },
82                Err::Error(_error) => panic!(),
83                Err::Failure(_failure) => panic!(),
84            },
85        }
86    }
87}