use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, ReadBuf};
use crate::errors::{Error, IllFormedError, Result, SyntaxError};
use crate::events::{BytesRef, Event};
use crate::name::{QName, ResolveResult};
use crate::parser::{ElementParser, Parser, PiParser};
use crate::reader::buffered_reader::impl_buffered_source;
use crate::reader::{
BangType, BinaryStream, NsReader, ParseState, ReadRefResult, ReadTextResult, Reader, Span,
};
use crate::utils::is_whitespace;
struct TokioAdapter<'a, R>(&'a mut R);
impl<'a, R: AsyncBufRead + Unpin> TokioAdapter<'a, R> {
impl_buffered_source!('b, 0, async, await);
}
impl<'r, R> AsyncRead for BinaryStream<'r, R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let start = buf.remaining();
let this = self.get_mut();
let poll = Pin::new(&mut *this.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(_)) = poll {
let amt = start - buf.remaining();
*this.offset += amt as u64;
}
poll
}
}
impl<'r, R> AsyncBufRead for BinaryStream<'r, R>
where
R: AsyncBufRead + Unpin,
{
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Pin::new(&mut *self.get_mut().inner).poll_fill_buf(cx)
}
#[inline]
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.inner.consume(amt);
*this.offset += amt as u64;
}
}
impl<R: AsyncBufRead + Unpin> Reader<R> {
pub async fn read_event_into_async<'b>(
&mut self,
mut buf: &'b mut Vec<u8>,
) -> Result<Event<'b>> {
read_event_impl!(
self,
buf,
TokioAdapter(&mut self.reader),
read_until_close_async,
await
)
}
pub async fn read_to_end_into_async<'n>(
&mut self,
end: QName<'n>,
buf: &mut Vec<u8>,
) -> Result<Span> {
Ok(read_to_end!(
self,
end,
buf,
read_event_into_async,
{
buf.clear();
},
await
))
}
async fn read_until_close_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
read_until_close!(self, buf, TokioAdapter(&mut self.reader), await)
}
}
impl<R: AsyncBufRead + Unpin> NsReader<R> {
pub async fn read_event_into_async<'b>(&mut self, buf: &'b mut Vec<u8>) -> Result<Event<'b>> {
self.pop();
let event = self.reader.read_event_into_async(buf).await;
self.process_event(event)
}
pub async fn read_to_end_into_async<'n>(
&mut self,
end: QName<'n>,
buf: &mut Vec<u8>,
) -> Result<Span> {
self.reader.read_to_end_into_async(end, buf).await
}
pub async fn read_resolved_event_into_async<'ns, 'b>(
&'ns mut self,
buf: &'b mut Vec<u8>,
) -> Result<(ResolveResult<'ns>, Event<'b>)> {
let event = self.read_event_into_async(buf).await?;
Ok(self.resolver().resolve_event(event))
}
}
#[cfg(test)]
mod test {
use super::TokioAdapter;
use crate::reader::test::check;
check!(
#[tokio::test]
read_event_into_async,
read_until_close_async,
TokioAdapter,
&mut Vec::new(),
async,
await
);
#[test]
fn test_future_is_send() {
use super::*;
use tokio::io::BufReader;
fn check_send<T: Send>(_: T) {}
let input = vec![];
let mut reading_buf = vec![];
let mut reader = Reader::from_reader(BufReader::new(input.as_slice()));
check_send(reader.read_event_into_async(&mut reading_buf));
}
}