use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::io;
use tokio::io::AsyncBufRead;
#[cfg(feature = "stream")]
use futures_core::stream::Stream;
use crate::error::EndOrError;
use crate::parser::{Options, Parse, Parser, RawParser, WithOptions};
use crate::Error;
use pin_project_lite::pin_project;
pub struct ReadEvent<'x, T, P: Parse> {
inner: Pin<&'x mut GenericAsyncReader<T, P>>,
}
impl<T: AsyncBufRead + Unpin, P: Parse> Future for ReadEvent<'_, T, P> {
type Output = io::Result<Option<P::Output>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll_read(cx)
}
}
pub struct ReadAll<'x, T, P: Parse, F> {
cb: F,
inner: Pin<&'x mut GenericAsyncReader<T, P>>,
}
impl<P: Parse, T: AsyncBufRead + Unpin, F: FnMut(P::Output) + Send + Unpin> Future
for ReadAll<'_, T, P, F>
{
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
match self.inner.as_mut().poll_read(cx) {
Poll::Ready(Ok(Some(ev))) => {
(self.cb)(ev);
}
Poll::Ready(Ok(None)) => return Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
}
}
}
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(all(feature = "stream", feature = "tokio"))))]
impl<T: AsyncBufRead, P: Parse> Stream for GenericAsyncReader<T, P> {
type Item = io::Result<P::Output>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.poll_read(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(Some(v))) => Poll::Ready(Some(Ok(v))),
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
}
}
}
pin_project! {
#[project = AsyncReaderProj]
pub struct GenericAsyncReader<T, P: Parse = Parser>{
#[pin]
inner: T,
parser: P,
}
}
fn parse_step<P: Parse>(
parser: &mut P,
buf: &mut &[u8],
may_eof: bool,
) -> (usize, Poll<Result<Option<P::Output>, Error>>) {
let old_len = buf.len();
let at_eof = may_eof && buf.is_empty();
let result = parser.parse(buf, at_eof);
let new_len = buf.len();
assert!(new_len <= old_len);
let read = old_len - new_len;
match result {
Ok(v) => (read, Poll::Ready(Ok(v))),
Err(EndOrError::NeedMoreData) => (read, Poll::Pending),
Err(EndOrError::Error(e)) => (read, Poll::Ready(Err(e))),
}
}
pub fn poll_parse_from<P: Parse, Reader: AsyncBufRead>(
mut r: Pin<&mut Reader>,
parser: &mut P,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<P::Output>>> {
loop {
let mut buf = match r.as_mut().poll_fill_buf(cx) {
Poll::Pending => {
return parse_step(parser, &mut &[][..], false)
.1
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e));
}
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
};
let (consumed, result) = parse_step(parser, &mut buf, true);
r.as_mut().consume(consumed);
match result {
Poll::Pending => continue,
Poll::Ready(v) => {
return Poll::Ready(v.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)));
}
}
}
}
impl<T, P: Parse + Default> GenericAsyncReader<T, P> {
pub fn new(inner: T) -> Self {
Self::wrap(inner, P::default())
}
}
impl<T, P: Parse + WithOptions> GenericAsyncReader<T, P> {
pub fn with_options(inner: T, options: Options) -> Self {
Self::wrap(inner, P::with_options(options))
}
}
impl<T, P: Parse> GenericAsyncReader<T, P> {
pub fn wrap(inner: T, parser: P) -> Self {
Self { inner, parser }
}
pub fn into_inner(self) -> (T, P) {
(self.inner, self.parser)
}
pub fn inner(&self) -> &T {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut T {
&mut self.inner
}
pub fn inner_pinned(self: Pin<&mut Self>) -> Pin<&mut T> {
let this = self.project();
this.inner
}
pub fn parser(&self) -> &P {
&self.parser
}
pub fn parser_mut(&mut self) -> &mut P {
&mut self.parser
}
pub fn parser_pinned(self: Pin<&mut Self>) -> &mut P {
let this = self.project();
this.parser
}
#[inline(always)]
#[doc(hidden)]
#[deprecated(
since = "0.12.0",
note = "use .parser_mut().release_temporaries() / .parser_pinned().release_temporaries()"
)]
pub fn release_temporaries(&mut self) {
self.parser.release_temporaries();
}
}
impl<T: AsyncBufRead, P: Parse> GenericAsyncReader<T, P> {
pub fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<Option<P::Output>>> {
let this = self.project();
poll_parse_from(this.inner, this.parser, cx)
}
}
impl<T: AsyncBufRead + Unpin, P: Parse> GenericAsyncReader<T, P> {
pub fn read(&mut self) -> ReadEvent<'_, T, P> {
ReadEvent {
inner: Pin::new(self),
}
}
pub fn read_all<F>(&mut self, cb: F) -> ReadAll<'_, T, P, F> {
ReadAll {
inner: Pin::new(self),
cb,
}
}
}
pub type AsyncReader<T> = GenericAsyncReader<T, Parser>;
pub type AsyncRawReader<T> = GenericAsyncReader<T, RawParser>;