nom_bufreader/
lib.rs

1//! This crate provides a `BufReader` alternative that can be used with
2//! [nom parsers](http://docs.rs/nom)
3//!
4//! It will hide for you the [Incomplete](https://docs.rs/nom/7.0.0/nom/enum.Err.html#variant.Incomplete) handling in nom for streaming parsers, retrying and refilling buffers automatically.
5//!
6//! For synchronous io, use `bufreader::BufReader`, while for asynchronous
7//! IO, you should use `async_bufreader::BufReader`
8//!
9//! # Examples
10//!
11//! ## sync
12//!
13//! ```rust,ignore
14//! use nom_bufreader::bufreader::BufReader;
15//! use nom_bufreader::{Error, Parse};
16//! use std::{net::TcpListener, str::from_utf8};
17//!
18//! fn main() -> Result<(), Error<()>> {
19//!     let listener = TcpListener::bind("127.0.0.1:8080")?;
20//!     let mut i = BufReader::new(listener.incoming().next().unwrap()?);
21//!
22//!     // method, space and path are nom parsers
23//!     let m = i.parse(method)?;
24//!     let _ = i.parse(space)?;
25//!     let p = i.parse(path)?;
26//!     println!("got method {}, path {}", m, p);
27//!     Ok(())
28//! }
29//! ```
30//!
31//! ### async
32//!
33//! #### tokio
34//!
35//! ```rust,ignore
36//! use nom_bufreader::async_bufreader::BufReader;
37//! use nom_bufreader::{AsyncParse, Error};
38//! use std::str::from_utf8;
39//! use tokio_util::compat::TokioAsyncReadCompatExt;
40//! use tokio::net::TcpListener;
41//!
42//! #[tokio::main]
43//! async fn main() -> Result<(), Error<()>> {
44//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
45//!     let mut i = BufReader::new(listener.accept().await?.0.compat());
46//!
47//!     let m = i.parse(method).await?;
48//!     let _ = i.parse(space).await?;
49//!     let p = i.parse(path).await?;
50//!     println!("got method {}, path {}", m, p);
51//!     Ok(())
52//! }
53//! ```
54//!
55//! #### async-std
56//!
57//! ```rust,ignore
58//! use nom_bufreader::async_bufreader::BufReader;
59//! use nom_bufreader::{AsyncParse, Error};
60//! use std::str::from_utf8;
61//! use async_std::net::TcpListener;
62//!
63//! #[async_std::main]
64//! async fn main() -> Result<(), Error<()>> {
65//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
66//!     let mut i = BufReader::new(listener.accept().await?.0);
67//!
68//!     let m = i.parse(method).await?;
69//!     let _ = i.parse(space).await?;
70//!     let p = i.parse(path).await?;
71//!     println!("got method {}, path {}", m, p);
72//!     Ok(())
73//! }
74//! ```
75use nom::{Err, Offset, Parser};
76use std::io::{self, BufRead, Read};
77
78#[cfg(feature = "async")]
79use async_trait::async_trait;
80#[cfg(feature = "async")]
81use futures::{
82    io::{AsyncBufReadExt, BufReader},
83    AsyncRead,
84};
85
86#[cfg(feature = "async")]
87pub mod async_bufreader;
88pub mod bufreader;
89
90#[derive(Debug)]
91pub enum Error<E> {
92    Error(E),
93    Failure(E),
94    Io(io::Error),
95    Eof,
96}
97
98impl<E> From<io::Error> for Error<E> {
99    fn from(e: io::Error) -> Self {
100        Error::Io(e)
101    }
102}
103
104pub trait Parse<O, E, P> {
105    fn parse(&mut self, p: P) -> Result<O, Error<E>>
106    where
107        for<'a> P: Parser<&'a [u8], O, E>;
108}
109
110impl<R: Read, O, E, P> Parse<O, E, P> for std::io::BufReader<R> {
111    fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
112    where
113        for<'a> P: Parser<&'a [u8], O, E>,
114    {
115        loop {
116            let opt =
117                    //match p(input.buffer()) {
118                    match p.parse(self.buffer()) {
119                        Err(Err::Error(e)) => return Err(Error::Error(e)),
120                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
121                        Err(Err::Incomplete(_)) => {
122                            None
123                        },
124                        Ok((i, o)) => {
125                            let offset = self.buffer().offset(i);
126                            Some((offset, o))
127                        },
128                };
129
130            match opt {
131                Some((sz, o)) => {
132                    self.consume(sz);
133                    return Ok(o);
134                }
135                None => {
136                    self.fill_buf()?;
137                }
138            }
139        }
140    }
141}
142
143impl<R: Read, O, E, P> Parse<O, E, P> for bufreader::BufReader<R> {
144    fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
145    where
146        for<'a> P: Parser<&'a [u8], O, E>,
147    {
148        let mut eof = false;
149        let mut error = None;
150        loop {
151            let opt =
152                    //match p(input.buffer()) {
153                    match p.parse(self.buffer()) {
154                        Err(Err::Error(e)) => return Err(Error::Error(e)),
155                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
156                        Err(Err::Incomplete(_)) => {
157                            None
158                        },
159                        Ok((i, o)) => {
160                            let offset = self.buffer().offset(i);
161                            Some((offset, o))
162                        },
163                };
164
165            match opt {
166                Some((sz, o)) => {
167                    self.consume(sz);
168                    return Ok(o);
169                }
170                None => {
171                    if eof {
172                        return Err(Error::Eof);
173                    }
174
175                    if let Some(e) = error.take() {
176                        return Err(Error::Io(e));
177                    }
178
179                    match self.fill_buf() {
180                        Err(e) => error = Some(e),
181                        Ok(s) => {
182                            if s.is_empty() {
183                                eof = true;
184                            }
185                        }
186                    }
187                }
188            }
189        }
190    }
191}
192
193#[cfg(feature = "async")]
194#[async_trait]
195pub trait AsyncParse<O, E, P> {
196    async fn parse(&mut self, p: P) -> Result<O, Error<E>>
197    where
198        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait;
199}
200
201#[cfg(feature = "async")]
202#[async_trait]
203impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P> for BufReader<R> {
204    async fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
205    where
206        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait,
207    {
208        loop {
209            let opt =
210                    //match p(input.buffer()) {
211                    match p.parse(self.buffer()) {
212                        Err(Err::Error(e)) => return Err(Error::Error(e)),
213                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
214                        Err(Err::Incomplete(_)) => {
215                            None
216                        },
217                        Ok((i, o)) => {
218                            let offset = self.buffer().offset(i);
219                            Some((offset, o))
220                        },
221                };
222
223            match opt {
224                Some((sz, o)) => {
225                    self.consume_unpin(sz);
226                    return Ok(o);
227                }
228                None => {
229                    self.fill_buf().await?;
230                }
231            }
232        }
233    }
234}
235
236#[cfg(feature = "async")]
237#[async_trait]
238impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P>
239    for async_bufreader::BufReader<R>
240{
241    async fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
242    where
243        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait,
244    {
245        loop {
246            let opt =
247                    //match p(input.buffer()) {
248                    match p.parse(self.buffer()) {
249                        Err(Err::Error(e)) => return Err(Error::Error(e)),
250                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
251                        Err(Err::Incomplete(_)) => {
252                            None
253                        },
254                        Ok((i, o)) => {
255                            let offset = self.buffer().offset(i);
256                            Some((offset, o))
257                        },
258                };
259
260            match opt {
261                Some((sz, o)) => {
262                    self.consume_unpin(sz);
263                    return Ok(o);
264                }
265                None => {
266                    self.fill_buf().await?;
267                }
268            }
269        }
270    }
271}