use std::future::Future;
use std::pin::Pin;
use std::result;
use std::task::{Context, Poll};
cfg_if::cfg_if! {
if #[cfg(feature = "tokio")] {
use tokio::io::{self, AsyncBufRead, AsyncSeekExt};
use tokio_stream::Stream;
} else {
use futures::io::{self, AsyncBufRead, AsyncSeekExt};
use futures::stream::Stream;
}}
use csv_core::ReaderBuilder as CoreReaderBuilder;
use csv_core::Reader as CoreReader;
#[cfg(feature = "with_serde")]
use serde::de::DeserializeOwned;
use crate::{Terminator, Trim};
use crate::byte_record::{ByteRecord, Position};
use crate::error::{Error, ErrorKind, Result, Utf8Error};
use crate::string_record::StringRecord;
cfg_if::cfg_if! {
if #[cfg(feature = "tokio")] {
pub mod ardr_tokio;
} else {
pub mod ardr_futures;
}}
#[cfg(all(feature = "with_serde", not(feature = "tokio")))]
pub mod ades_futures;
#[cfg(all(feature = "with_serde", feature = "tokio"))]
pub mod ades_tokio;
#[derive(Debug)]
pub struct AsyncReaderBuilder {
capacity: usize,
flexible: bool,
has_headers: bool,
trim: Trim,
end_on_io_error: bool,
builder: Box<CoreReaderBuilder>,
}
impl Default for AsyncReaderBuilder {
fn default() -> AsyncReaderBuilder {
AsyncReaderBuilder {
capacity: 8 * (1 << 10),
flexible: false,
has_headers: true,
trim: Trim::default(),
end_on_io_error: true,
builder: Box::<CoreReaderBuilder>::default(),
}
}
}
impl AsyncReaderBuilder {
pub fn new() -> AsyncReaderBuilder {
AsyncReaderBuilder::default()
}
pub fn delimiter(&mut self, delimiter: u8) -> &mut AsyncReaderBuilder {
self.builder.delimiter(delimiter);
self
}
pub fn has_headers(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.has_headers = yes;
self
}
pub fn flexible(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.flexible = yes;
self
}
pub fn end_on_io_error(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.end_on_io_error = yes;
self
}
pub fn trim(&mut self, trim: Trim) -> &mut AsyncReaderBuilder {
self.trim = trim;
self
}
pub fn terminator(&mut self, term: Terminator) -> &mut AsyncReaderBuilder {
self.builder.terminator(term.to_core());
self
}
pub fn quote(&mut self, quote: u8) -> &mut AsyncReaderBuilder {
self.builder.quote(quote);
self
}
pub fn escape(&mut self, escape: Option<u8>) -> &mut AsyncReaderBuilder {
self.builder.escape(escape);
self
}
pub fn double_quote(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.builder.double_quote(yes);
self
}
pub fn quoting(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.builder.quoting(yes);
self
}
pub fn comment(&mut self, comment: Option<u8>) -> &mut AsyncReaderBuilder {
self.builder.comment(comment);
self
}
pub fn ascii(&mut self) -> &mut AsyncReaderBuilder {
self.builder.ascii();
self
}
pub fn buffer_capacity(&mut self, capacity: usize) -> &mut AsyncReaderBuilder {
self.capacity = capacity;
self
}
#[doc(hidden)]
pub fn nfa(&mut self, yes: bool) -> &mut AsyncReaderBuilder {
self.builder.nfa(yes);
self
}
}
#[derive(Debug)]
pub struct ReaderState {
headers: Option<Headers>,
has_headers: bool,
flexible: bool,
trim: Trim,
first_field_count: Option<u64>,
cur_pos: Position,
first: bool,
seeked: bool,
end_on_io_error: bool,
eof: ReaderEofState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ReaderEofState {
NotEof,
Eof,
IOError,
}
#[derive(Debug)]
struct Headers {
byte_record: ByteRecord,
string_record: result::Result<StringRecord, Utf8Error>,
}
impl ReaderState {
#[inline(always)]
fn add_record(&mut self, record: &ByteRecord) -> Result<()> {
let i = self.cur_pos.record();
self.cur_pos.set_record(i.checked_add(1).unwrap());
if !self.flexible {
match self.first_field_count {
None => self.first_field_count = Some(record.len() as u64),
Some(expected) => {
if record.len() as u64 != expected {
return Err(Error::new(ErrorKind::UnequalLengths {
pos: record.position().map(Clone::clone),
expected_len: expected,
len: record.len() as u64,
}));
}
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct AsyncReaderImpl<R> {
core: Box<CoreReader>,
rdr: io::BufReader<R>,
state: ReaderState,
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct FillBuf<'a, R: AsyncBufRead + ?Sized> {
reader: &'a mut R,
}
impl<R: AsyncBufRead + ?Sized + Unpin> Unpin for FillBuf<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
pub fn new(reader: &'a mut R) -> Self {
Self { reader }
}
}
impl<R: AsyncBufRead + ?Sized + Unpin> Future for FillBuf<'_, R> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut *self.reader).poll_fill_buf(cx) {
Poll::Ready(res) => {
match res {
Ok(res) => Poll::Ready(Ok(res.len())),
Err(e) => Poll::Ready(Err(e))
}
},
Poll::Pending => Poll::Pending
}
}
}
impl<'r, R> AsyncReaderImpl<R>
where
R: io::AsyncRead + Unpin + 'r,
{
fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReaderImpl<R> {
AsyncReaderImpl {
core: Box::new(builder.builder.build()),
rdr: io::BufReader::with_capacity(builder.capacity, rdr),
state: ReaderState {
headers: None,
has_headers: builder.has_headers,
flexible: builder.flexible,
trim: builder.trim,
end_on_io_error: builder.end_on_io_error,
first_field_count: None,
cur_pos: Position::new(),
first: false,
seeked: false,
eof: ReaderEofState::NotEof,
},
}
}
pub async fn headers(&mut self) -> Result<&StringRecord> {
if self.state.headers.is_none() {
let mut record = ByteRecord::new();
self.read_byte_record_impl(&mut record).await?;
self.set_headers_impl(Err(record));
}
let headers = self.state.headers.as_ref().unwrap();
match headers.string_record {
Ok(ref record) => Ok(record),
Err(ref err) => Err(Error::new(ErrorKind::Utf8 {
pos: headers.byte_record.position().map(Clone::clone),
err: err.clone(),
})),
}
}
pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
if self.state.headers.is_none() {
let mut record = ByteRecord::new();
self.read_byte_record_impl(&mut record).await?;
self.set_headers_impl(Err(record));
}
Ok(&self.state.headers.as_ref().unwrap().byte_record)
}
pub fn set_headers(&mut self, headers: StringRecord) {
self.set_headers_impl(Ok(headers));
}
pub fn set_byte_headers(&mut self, headers: ByteRecord) {
self.set_headers_impl(Err(headers));
}
fn set_headers_impl(
&mut self,
headers: result::Result<StringRecord, ByteRecord>,
) {
let (mut str_headers, mut byte_headers) = match headers {
Ok(string) => {
let bytes = string.clone().into_byte_record();
(Ok(string), bytes)
}
Err(bytes) => {
match StringRecord::from_byte_record(bytes.clone()) {
Ok(str_headers) => (Ok(str_headers), bytes),
Err(err) => (Err(err.utf8_error().clone()), bytes),
}
}
};
if self.state.trim.should_trim_headers() {
if let Ok(ref mut str_headers) = str_headers.as_mut() {
str_headers.trim();
}
byte_headers.trim();
}
self.state.headers = Some(Headers {
byte_record: byte_headers,
string_record: str_headers,
});
}
pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
let result = record.read(self).await;
if self.state.trim.should_trim_fields() {
record.trim();
}
result
}
pub async fn read_byte_record(
&mut self,
record: &mut ByteRecord,
) -> Result<bool> {
if !self.state.seeked && !self.state.has_headers && !self.state.first {
if let Some(ref headers) = self.state.headers {
self.state.first = true;
record.clone_from(&headers.byte_record);
if self.state.trim.should_trim_fields() {
record.trim();
}
return Ok(!record.is_empty());
}
}
let ok = self.read_byte_record_impl(record).await?;
self.state.first = true;
if !self.state.seeked && self.state.headers.is_none() {
self.set_headers_impl(Err(record.clone()));
if self.state.has_headers {
let result = self.read_byte_record_impl(record).await;
if self.state.trim.should_trim_fields() {
record.trim();
}
return result;
}
} else if self.state.trim.should_trim_fields() {
record.trim();
}
Ok(ok)
}
#[inline(always)]
async fn read_byte_record_impl(
&mut self,
record: &mut ByteRecord,
) -> Result<bool> {
use csv_core::ReadRecordResult::*;
record.clear();
record.set_position(Some(self.state.cur_pos.clone()));
match self.state.eof {
ReaderEofState::Eof => return Ok(false),
ReaderEofState::IOError => {
if self.state.end_on_io_error { return Ok(false) }
},
ReaderEofState::NotEof => {}
}
let (mut outlen, mut endlen) = (0, 0);
loop {
let (res, nin, nout, nend) = {
if let Err(err) = FillBuf::new(&mut self.rdr).await {
self.state.eof = ReaderEofState::IOError;
return Err(err.into());
}
let (fields, ends) = record.as_parts();
self.core.read_record(
self.rdr.buffer(),
&mut fields[outlen..],
&mut ends[endlen..],
)
};
Pin::new(&mut self.rdr).consume(nin);
let byte = self.state.cur_pos.byte();
self.state
.cur_pos
.set_byte(byte + nin as u64)
.set_line(self.core.line());
outlen += nout;
endlen += nend;
match res {
InputEmpty => continue,
OutputFull => {
record.expand_fields();
continue;
}
OutputEndsFull => {
record.expand_ends();
continue;
}
Record => {
record.set_len(endlen);
self.state.add_record(record)?;
return Ok(true);
}
End => {
self.state.eof = ReaderEofState::Eof;
return Ok(false);
}
}
}
}
#[inline]
pub fn position(&self) -> &Position {
&self.state.cur_pos
}
pub fn is_done(&self) -> bool {
self.state.eof != ReaderEofState::NotEof
}
pub fn has_headers(&self) -> bool {
self.state.has_headers
}
pub fn get_ref(&self) -> &R {
self.rdr.get_ref()
}
pub fn get_mut(&mut self) -> &mut R {
self.rdr.get_mut()
}
pub fn into_inner(self) -> R {
self.rdr.into_inner()
}
}
impl<R: io::AsyncRead + io::AsyncSeek + Unpin> AsyncReaderImpl<R> {
pub async fn seek(&mut self, pos: Position) -> Result<()> {
self.byte_headers().await?;
self.state.seeked = true;
if pos.byte() == self.state.cur_pos.byte() {
return Ok(());
}
self.rdr.seek(io::SeekFrom::Start(pos.byte())).await?;
self.core.reset();
self.core.set_line(pos.line());
self.state.cur_pos = pos;
self.state.eof = ReaderEofState::NotEof;
Ok(())
}
pub async fn seek_raw(
&mut self,
seek_from: io::SeekFrom,
pos: Position,
) -> Result<()> {
self.byte_headers().await?;
self.state.seeked = true;
self.rdr.seek(seek_from).await?;
self.core.reset();
self.core.set_line(pos.line());
self.state.cur_pos = pos;
self.state.eof = ReaderEofState::NotEof;
Ok(())
}
#[cfg(feature = "tokio")]
pub async fn rewind(&mut self) -> Result<()> {
self.byte_headers().await?;
self.state.seeked = false;
self.state.headers = None;
self.state.first = false;
if self.state.cur_pos.byte() == 0 {
return Ok(());
}
self.rdr.rewind().await?;
self.core.reset();
self.core.set_line(1);
self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
self.state.eof = ReaderEofState::NotEof;
Ok(())
}
#[cfg(not(feature = "tokio"))]
pub async fn rewind(&mut self) -> Result<()> {
self.byte_headers().await?;
self.state.seeked = false;
self.state.headers = None;
self.state.first = false;
if self.state.cur_pos.byte() == 0 {
return Ok(());
}
self.rdr.seek(io::SeekFrom::Start(0)).await?;
self.core.reset();
self.core.set_line(1);
self.state.cur_pos.set_byte(0).set_line(1).set_record(0);
self.state.eof = ReaderEofState::NotEof;
Ok(())
}
}
async fn read_record_borrowed<R>(
rdr: &mut AsyncReaderImpl<R>,
mut rec: StringRecord,
) -> (Option<Result<StringRecord>>, &mut AsyncReaderImpl<R>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(Ok(rec.clone())),
Ok(false) => None,
};
(result, rdr, rec)
}
#[allow(clippy::type_complexity)]
pub struct StringRecordsStream<'r, R>
where
R: io::AsyncRead + Unpin + Send
{
fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<StringRecord>>,
&'r mut AsyncReaderImpl<R>,
StringRecord,
),
> + Send + 'r,
>,
>,
>,
}
impl<'r, R> StringRecordsStream<'r, R>
where
R: io::AsyncRead + Unpin + Send
{
fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
Self {
fut: Some(Pin::from(Box::new(read_record_borrowed(
rdr,
StringRecord::new(),
)))),
}
}
}
impl<'r, R> Stream for StringRecordsStream<'r, R>
where
R: io::AsyncRead + Unpin + Send
{
type Item = Result<StringRecord>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(fut) = self.fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, rec)) => {
if result.is_some() {
self.fut =
Some(Pin::from(Box::new(read_record_borrowed(rdr, rec))));
} else {
self.fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn read_record<R>(
mut rdr: AsyncReaderImpl<R>,
mut rec: StringRecord,
) -> (Option<Result<StringRecord>>, AsyncReaderImpl<R>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(Ok(rec.clone())),
Ok(false) => None,
};
(result, rdr, rec)
}
#[allow(clippy::type_complexity)]
pub struct StringRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Unpin + Send
{
fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<StringRecord>>,
AsyncReaderImpl<R>,
StringRecord,
),
> + Send + 'r,
>,
>,
>,
}
impl<'r, R> StringRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Unpin + Send + 'r
{
fn new(rdr: AsyncReaderImpl<R>) -> Self {
Self {
fut: Some(Pin::from(Box::new(read_record(
rdr,
StringRecord::new(),
)))),
}
}
}
impl<'r, R> Stream for StringRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Unpin + Send + 'r
{
type Item = Result<StringRecord>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(fut) = self.fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, rec)) => {
if result.is_some() {
self.fut =
Some(Pin::from(Box::new(read_record(rdr, rec))));
} else {
self.fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn read_byte_record_borrowed<R>(
rdr: &mut AsyncReaderImpl<R>,
mut rec: ByteRecord,
) -> (Option<Result<ByteRecord>>, &mut AsyncReaderImpl<R>, ByteRecord)
where
R: io::AsyncRead + Unpin,
{
let result = match rdr.read_byte_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(Ok(rec.clone())),
Ok(false) => None,
};
(result, rdr, rec)
}
#[allow(clippy::type_complexity)]
pub struct ByteRecordsStream<'r, R>
where
R: io::AsyncRead + Unpin + Send,
{
fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<ByteRecord>>,
&'r mut AsyncReaderImpl<R>,
ByteRecord,
),
> + Send + 'r,
>,
>,
>,
}
impl<'r, R> ByteRecordsStream<'r, R>
where
R: io::AsyncRead + Unpin + Send + 'r,
{
fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
Self {
fut: Some(Pin::from(Box::new(read_byte_record_borrowed(
rdr,
ByteRecord::new(),
)))),
}
}
}
impl<'r, R> Stream for ByteRecordsStream<'r, R>
where
R: io::AsyncRead + Send + Unpin,
{
type Item = Result<ByteRecord>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(fut) = self.fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, rec)) => {
if result.is_some() {
self.fut =
Some(Pin::from(Box::new(read_byte_record_borrowed(rdr, rec))));
} else {
self.fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn read_byte_record<R>(
mut rdr: AsyncReaderImpl<R>,
mut rec: ByteRecord,
) -> (Option<Result<ByteRecord>>, AsyncReaderImpl<R>, ByteRecord)
where
R: io::AsyncRead + Unpin
{
let result = match rdr.read_byte_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(Ok(rec.clone())),
Ok(false) => None,
};
(result, rdr, rec)
}
#[allow(clippy::type_complexity)]
pub struct ByteRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Unpin + Send
{
fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<ByteRecord>>,
AsyncReaderImpl<R>,
ByteRecord,
),
> + Send + 'r,
>,
>,
>,
}
impl<'r, R> ByteRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Send + Unpin + 'r
{
fn new(rdr: AsyncReaderImpl<R>) -> Self {
Self {
fut: Some(Pin::from(Box::new(read_byte_record(
rdr,
ByteRecord::new(),
)))),
}
}
}
impl<'r, R> Stream for ByteRecordsIntoStream<'r, R>
where
R: io::AsyncRead + Send + Unpin + 'r
{
type Item = Result<ByteRecord>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(fut) = self.fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, rec)) => {
if result.is_some() {
self.fut =
Some(Pin::from(Box::new(read_byte_record(rdr, rec))));
} else {
self.fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
cfg_if::cfg_if! {
if #[cfg(feature = "with_serde")] {
async fn deserialize_record_borrowed<R, D: DeserializeOwned>(
rdr: &mut AsyncReaderImpl<R>,
headers: Option<StringRecord>,
mut rec: StringRecord,
) -> (Option<Result<D>>, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(rec.deserialize(headers.as_ref())),
Ok(false) => None,
};
(result, rdr, headers, rec)
}
#[allow(clippy::type_complexity)]
pub struct DeserializeRecordsStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
header_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Result<StringRecord>,
&'r mut AsyncReaderImpl<R>,
)
> + Send + 'r,
>,
>,
>,
rec_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<D>>,
&'r mut AsyncReaderImpl<R>,
Option<StringRecord>,
StringRecord,
)
> + Send + 'r,
>,
>,
>,
}
impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
let has_headers = rdr.has_headers();
if has_headers {
Self {
header_fut: Some(Pin::from(Box::new(
async{ (rdr.headers().await.cloned(), rdr) }
))),
rec_fut: None,
}
} else {
Self {
header_fut: None,
rec_fut: Some(Pin::from(Box::new(
deserialize_record_borrowed(rdr, None, StringRecord::new())
))),
}
}
}
}
impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
type Item = Result<D>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(header_fut) = &mut self.header_fut {
match header_fut.as_mut().poll(cx) {
Poll::Ready((Ok(headers), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_borrowed(rdr, Some(headers), StringRecord::new()),
)));
cx.waker().wake_by_ref();
Poll::Pending
},
Poll::Ready((Err(err), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_borrowed(rdr, None, StringRecord::new()),
)));
Poll::Ready(Some(Err(err)))
},
Poll::Pending => Poll::Pending,
}
} else if let Some(fut) = self.rec_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, headers, rec)) => {
if result.is_some() {
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_borrowed(rdr, headers, rec),
)));
} else {
self.rec_fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn deserialize_record_with_pos_borrowed<R, D: DeserializeOwned>(
rdr: &mut AsyncReaderImpl<R>,
headers: Option<StringRecord>,
mut rec: StringRecord,
) -> (Option<Result<D>>, Position, &mut AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let pos = rdr.position().clone();
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(rec.deserialize(headers.as_ref())),
Ok(false) => None,
};
(result, pos, rdr, headers, rec)
}
#[allow(clippy::type_complexity)]
pub struct DeserializeRecordsStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
header_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Result<StringRecord>,
&'r mut AsyncReaderImpl<R>,
)
> + Send + 'r,
>,
>,
>,
rec_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<D>>,
Position,
&'r mut AsyncReaderImpl<R>,
Option<StringRecord>,
StringRecord,
)
> + Send + 'r,
>,
>,
>,
}
impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
fn new(rdr: &'r mut AsyncReaderImpl<R>) -> Self {
let has_headers = rdr.has_headers();
if has_headers {
Self {
header_fut: Some(Pin::from(Box::new(
async{ (rdr.headers().await.cloned(), rdr) }
))),
rec_fut: None,
}
} else {
Self {
header_fut: None,
rec_fut: Some(Pin::from(Box::new(
deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new())
))),
}
}
}
}
impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
type Item = (Result<D>, Position);
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(header_fut) = &mut self.header_fut {
match header_fut.as_mut().poll(cx) {
Poll::Ready((Ok(headers), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos_borrowed(rdr, Some(headers), StringRecord::new()),
)));
cx.waker().wake_by_ref();
Poll::Pending
},
Poll::Ready((Err(err), rdr)) => {
self.header_fut = None;
let pos = rdr.position().clone();
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos_borrowed(rdr, None, StringRecord::new()),
)));
Poll::Ready(Some((Err(err), pos)))
},
Poll::Pending => Poll::Pending,
}
} else if let Some(fut) = self.rec_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, pos, rdr, headers, rec)) => {
if let Some(result) = result {
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos_borrowed(rdr, headers, rec),
)));
Poll::Ready(Some((result, pos)))
} else {
self.rec_fut = None;
Poll::Ready(None)
}
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn deserialize_record<R, D: DeserializeOwned>(
mut rdr: AsyncReaderImpl<R>,
headers: Option<StringRecord>,
mut rec: StringRecord,
) -> (Option<Result<D>>, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(rec.deserialize(headers.as_ref())),
Ok(false) => None,
};
(result, rdr, headers, rec)
}
#[allow(clippy::type_complexity)]
pub struct DeserializeRecordsIntoStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
header_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Result<StringRecord>,
AsyncReaderImpl<R>,
)
> + Send + 'r,
>,
>,
>,
rec_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<D>>,
AsyncReaderImpl<R>,
Option<StringRecord>,
StringRecord,
)
> + Send + 'r,
>,
>,
>,
}
impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send + 'r
{
fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
let has_headers = rdr.has_headers();
if has_headers {
Self {
header_fut: Some(Pin::from(Box::new(
async{ (rdr.headers().await.cloned(), rdr) }
))),
rec_fut: None,
}
} else {
Self {
header_fut: None,
rec_fut: Some(Pin::from(Box::new(
deserialize_record(rdr, None, StringRecord::new())
))),
}
}
}
}
impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStream<'r, R, D>
where
R: io::AsyncRead + Unpin + Send + 'r
{
type Item = Result<D>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(header_fut) = &mut self.header_fut {
match header_fut.as_mut().poll(cx) {
Poll::Ready((Ok(headers), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record(rdr, Some(headers), StringRecord::new()),
)));
cx.waker().wake_by_ref();
Poll::Pending
},
Poll::Ready((Err(err), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record(rdr, None, StringRecord::new()),
)));
Poll::Ready(Some(Err(err)))
},
Poll::Pending => Poll::Pending,
}
} else if let Some(fut) = self.rec_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, rdr, headers, rec)) => {
if result.is_some() {
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record(rdr, headers, rec),
)));
} else {
self.rec_fut = None;
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
async fn deserialize_record_with_pos<R, D: DeserializeOwned>(
mut rdr: AsyncReaderImpl<R>,
headers: Option<StringRecord>,
mut rec: StringRecord,
) -> (Option<Result<D>>, Position, AsyncReaderImpl<R>, Option<StringRecord>, StringRecord)
where
R: io::AsyncRead + Unpin
{
let pos = rdr.position().clone();
let result = match rdr.read_record(&mut rec).await {
Err(err) => Some(Err(err)),
Ok(true) => Some(rec.deserialize(headers.as_ref())),
Ok(false) => None,
};
(result, pos, rdr, headers, rec)
}
#[allow(clippy::type_complexity)]
pub struct DeserializeRecordsIntoStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send
{
header_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Result<StringRecord>,
AsyncReaderImpl<R>,
)
> + Send + 'r,
>,
>,
>,
rec_fut: Option<
Pin<
Box<
dyn Future<
Output = (
Option<Result<D>>,
Position,
AsyncReaderImpl<R>,
Option<StringRecord>,
StringRecord,
)
> + Send + 'r,
>,
>,
>,
}
impl<'r, R, D: DeserializeOwned + 'r> DeserializeRecordsIntoStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send + 'r
{
fn new(mut rdr: AsyncReaderImpl<R>) -> Self {
let has_headers = rdr.has_headers();
if has_headers {
Self {
header_fut: Some(Pin::from(Box::new(
async{ (rdr.headers().await.cloned(), rdr) }
))),
rec_fut: None,
}
} else {
Self {
header_fut: None,
rec_fut: Some(Pin::from(Box::new(
deserialize_record_with_pos(rdr, None, StringRecord::new())
))),
}
}
}
}
impl<'r, R, D: DeserializeOwned + 'r> Stream for DeserializeRecordsIntoStreamPos<'r, R, D>
where
R: io::AsyncRead + Unpin + Send + 'r
{
type Item = (Result<D>, Position);
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
if let Some(header_fut) = &mut self.header_fut {
match header_fut.as_mut().poll(cx) {
Poll::Ready((Ok(headers), rdr)) => {
self.header_fut = None;
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos(rdr, Some(headers), StringRecord::new()),
)));
cx.waker().wake_by_ref();
Poll::Pending
},
Poll::Ready((Err(err), rdr)) => {
self.header_fut = None;
let pos = rdr.position().clone();
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos(rdr, None, StringRecord::new()),
)));
Poll::Ready(Some((Err(err), pos)))
},
Poll::Pending => Poll::Pending,
}
} else if let Some(fut) = self.rec_fut.as_mut() {
match fut.as_mut().poll(cx) {
Poll::Ready((result, pos, rdr, headers, rec)) => {
if let Some(result) = result {
self.rec_fut = Some(Pin::from(Box::new(
deserialize_record_with_pos(rdr, headers, rec),
)));
Poll::Ready(Some((result, pos)))
} else {
self.rec_fut = None;
Poll::Ready(None)
}
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(None)
}
}
}
}}