nom_bufreader_rp/
lib.rs

1//! This crate provides a `BufReader` alternative that can be used with
2//! [nom parsers](http://docs.rs/nom)
3//!
4//! It will hide for you the [Incomplete](https://docs.rs/nom/7.0.0/nom/enum.Err.html#variant.Incomplete) handling in nom for streaming parsers, retrying and refilling buffers automatically.
5//!
6//! For synchronous io, use `bufreader::BufReader`, while for asynchronous
7//! IO, you should use `async_bufreader::BufReader`
8//!
9//! # Examples
10//!
11//! ## sync
12//!
13//! ```rust,ignore
14//! use nom_bufreader::bufreader::BufReader;
15//! use nom_bufreader::{Error, Parse};
16//! use std::{net::TcpListener, str::from_utf8};
17//!
18//! fn main() -> Result<(), Error<()>> {
19//!     let listener = TcpListener::bind("127.0.0.1:8080")?;
20//!     let mut i = BufReader::new(listener.incoming().next().unwrap()?);
21//!
22//!     // method, space and path are nom parsers
23//!     let m = i.parse(method)?;
24//!     let _ = i.parse(space)?;
25//!     let p = i.parse(path)?;
26//!     println!("got method {}, path {}", m, p);
27//!     Ok(())
28//! }
29//! ```
30//!
31//! ### async
32//!
33//! #### tokio
34//!
35//! ```rust,ignore
36//! use nom_bufreader::async_bufreader::BufReader;
37//! use nom_bufreader::{AsyncParse, Error};
38//! use std::str::from_utf8;
39//! use tokio_util::compat::TokioAsyncReadCompatExt;
40//! use tokio::net::TcpListener;
41//!
42//! #[tokio::main]
43//! async fn main() -> Result<(), Error<()>> {
44//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
45//!     let mut i = BufReader::new(listener.accept().await?.0.compat());
46//!
47//!     let m = i.parse(method).await?;
48//!     let _ = i.parse(space).await?;
49//!     let p = i.parse(path).await?;
50//!     println!("got method {}, path {}", m, p);
51//!     Ok(())
52//! }
53//! ```
54//!
55//! #### async-std
56//!
57//! ```rust,ignore
58//! use nom_bufreader::async_bufreader::BufReader;
59//! use nom_bufreader::{AsyncParse, Error};
60//! use std::str::from_utf8;
61//! use async_std::net::TcpListener;
62//!
63//! #[async_std::main]
64//! async fn main() -> Result<(), Error<()>> {
65//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
66//!     let mut i = BufReader::new(listener.accept().await?.0);
67//!
68//!     let m = i.parse(method).await?;
69//!     let _ = i.parse(space).await?;
70//!     let p = i.parse(path).await?;
71//!     println!("got method {}, path {}", m, p);
72//!     Ok(())
73//! }
74//! ```
75use nom::{Err, Offset, Parser};
76use std::io::{self, BufRead, Read};
77
78#[cfg(feature = "async")]
79use async_trait::async_trait;
80#[cfg(feature = "async")]
81use futures::{
82    io::{AsyncBufReadExt, BufReader},
83    AsyncRead,
84};
85
86#[cfg(feature = "async")]
87pub mod async_bufreader;
88pub mod bufreader;
89
90#[derive(Debug)]
91pub enum Error<E> {
92    Error(E),
93    Failure(E),
94    Io(io::Error),
95    Eof,
96}
97
98impl<E> From<io::Error> for Error<E> {
99    fn from(e: io::Error) -> Self {
100        Error::Io(e)
101    }
102}
103
104pub trait Parse<O, E, P> {
105    fn parse(&mut self, p: P) -> Result<O, Error<E>>
106    where
107        for<'a> P: Parser<&'a [u8], O, E>;
108}
109
110impl<R: Read, O, E, P> Parse<O, E, P> for std::io::BufReader<R> {
111    fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
112    where
113        for<'a> P: Parser<&'a [u8], O, E>,
114    {
115        loop {
116            let opt =
117                    //match p(input.buffer()) {
118                    match p.parse(self.buffer()) {
119                        Err(Err::Error(e)) => return Err(Error::Error(e)),
120                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
121                        Err(Err::Incomplete(_)) => {
122                            None
123                        },
124                        Ok((i, o)) => {
125                            let offset = self.buffer().offset(i);
126                            Some((offset, o))
127                        },
128                };
129
130            match opt {
131                Some((sz, o)) => {
132                    self.consume(sz);
133                    return Ok(o);
134                }
135                None => {
136                    self.fill_buf()?;
137                }
138            }
139        }
140    }
141}
142
143impl<R: Read, O, E, P> Parse<O, E, P> for bufreader::BufReader<R> {
144    fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
145    where
146        for<'a> P: Parser<&'a [u8], O, E>,
147    {
148        let mut eof = false;
149        let mut error = None;
150        let mut count = 0;
151
152        loop {
153            let opt =
154                    //match p(input.buffer()) {
155                    match p.parse(self.buffer()) {
156                        Err(Err::Error(e)) => return Err(Error::Error(e)),
157                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
158                        Err(Err::Incomplete(_)) => {
159                            None
160                        },
161                        Ok((i, o)) => {
162                            let offset = self.buffer().offset(i);
163                            Some((offset, o))
164                        },
165                };
166            match opt {
167                Some((sz, o)) => {
168                    self.consume(sz);
169                    count = 0;
170                    return Ok(o);
171                }
172                None => {
173                    if count == 5 {
174                        eof = true;
175                    }
176                    count +=1;
177
178                    if eof {
179                        return Err(Error::Eof);
180                    }
181
182                    if let Some(e) = error.take() {
183                        return Err(Error::Io(e));
184                    }
185
186                    match self.fill_buf() {
187                        Err(e) => error = Some(e),
188                        Ok(s) => {
189                            if !s.is_empty() {
190                                count = 0;
191                            }
192                            /*
193                            if s.is_empty() {
194                                eof = true;
195                            }
196                            */
197                        }
198                    }
199                }
200            }
201        }
202    }
203}
204
205#[cfg(feature = "async")]
206#[async_trait]
207pub trait AsyncParse<O, E, P> {
208    async fn parse(&mut self, p: P) -> Result<O, Error<E>>
209    where
210        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait;
211}
212
213#[cfg(feature = "async")]
214#[async_trait]
215impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P> for BufReader<R> {
216    async fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
217    where
218        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait,
219    {
220        let mut count = 0;
221        let mut eof = false;
222        loop {
223            let opt =
224                    //match p(input.buffer()) {
225                    match p.parse(self.buffer()) {
226                        Err(Err::Error(e)) => return Err(Error::Error(e)),
227                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
228                        Err(Err::Incomplete(_)) => {
229                            None
230                        },
231                        Ok((i, o)) => {
232                            let offset = self.buffer().offset(i);
233                            Some((offset, o))
234                        },
235                };
236
237            match opt {
238                Some((sz, o)) => {
239                    self.consume_unpin(sz);
240                    count = 0;
241                    return Ok(o);
242                }
243                None => {
244                    if count == 5 {
245                        eof = true;
246                    }
247                    count +=1;
248
249                    if eof {
250                        return Err(Error::Eof);
251                    }
252
253                    self.fill_buf().await?;
254                }
255            }
256        }
257    }
258}
259
260#[cfg(feature = "async")]
261#[async_trait]
262impl<R: AsyncRead + Unpin + Send, O: Send, E, P> AsyncParse<O, E, P>
263    for async_bufreader::BufReader<R>
264{
265    async fn parse(&mut self, mut p: P) -> Result<O, Error<E>>
266    where
267        for<'a> P: Parser<&'a [u8], O, E> + Send + 'async_trait,
268    {
269        let mut count = 0;
270        let mut eof = false;
271        loop {
272            let opt =
273                    //match p(input.buffer()) {
274                    match p.parse(self.buffer()) {
275                        Err(Err::Error(e)) => return Err(Error::Error(e)),
276                        Err(Err::Failure(e)) => return Err(Error::Failure(e)),
277                        Err(Err::Incomplete(_)) => {
278                            None
279                        },
280                        Ok((i, o)) => {
281                            let offset = self.buffer().offset(i);
282                            Some((offset, o))
283                        },
284                };
285
286            match opt {
287                Some((sz, o)) => {
288                    self.consume_unpin(sz);
289                    count = 0;
290                    return Ok(o);
291                }
292                None => {
293                    if count == 5 {
294                        eof = true;
295                    }
296                    count +=1;
297
298                    if eof {
299                        return Err(Error::Eof);
300                    }
301
302                    self.fill_buf().await?;
303                }
304            }
305        }
306    }
307}