use tokio::io;
use serde::de::DeserializeOwned;
use crate::AsyncReaderBuilder;
use crate::byte_record::{ByteRecord, Position};
use crate::error::Result;
use crate::string_record::StringRecord;
use super::{
AsyncReaderImpl,
DeserializeRecordsStream, DeserializeRecordsIntoStream,
DeserializeRecordsStreamPos, DeserializeRecordsIntoStreamPos,
};
impl AsyncReaderBuilder {
pub fn create_deserializer<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncDeserializer<R> {
AsyncDeserializer::new(self, rdr)
}
}
#[derive(Debug)]
pub struct AsyncDeserializer<R>(AsyncReaderImpl<R>);
impl<'r, R> AsyncDeserializer<R>
where
R: io::AsyncRead + Unpin + Send + 'r,
{
fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncDeserializer<R> {
AsyncDeserializer(AsyncReaderImpl::new(builder, rdr))
}
#[inline]
pub fn from_reader(rdr: R) -> AsyncDeserializer<R> {
AsyncReaderBuilder::new().create_deserializer(rdr)
}
#[inline]
pub fn deserialize<D:'r>(&'r mut self) -> DeserializeRecordsStream<'r, R, D>
where
D: DeserializeOwned,
{
DeserializeRecordsStream::new(& mut self.0)
}
#[inline]
pub fn deserialize_with_pos<D:'r>(&'r mut self) -> DeserializeRecordsStreamPos<'r, R, D>
where
D: DeserializeOwned,
{
DeserializeRecordsStreamPos::new(& mut self.0)
}
#[inline]
pub fn into_deserialize<D:'r>(self) -> DeserializeRecordsIntoStream<'r, R, D>
where
D: DeserializeOwned,
{
DeserializeRecordsIntoStream::new(self.0)
}
#[inline]
pub fn into_deserialize_with_pos<D:'r>(self) -> DeserializeRecordsIntoStreamPos<'r, R, D>
where
D: DeserializeOwned,
{
DeserializeRecordsIntoStreamPos::new(self.0)
}
#[inline]
pub async fn headers(&mut self) -> Result<&StringRecord> {
self.0.headers().await
}
#[inline]
pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
self.0.byte_headers().await
}
#[inline]
pub fn set_headers(&mut self, headers: StringRecord) {
self.0.set_headers(headers);
}
#[inline]
pub fn set_byte_headers(&mut self, headers: ByteRecord) {
self.0.set_byte_headers(headers);
}
#[inline]
pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
self.0.read_record(record).await
}
#[inline]
pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
self.0.read_byte_record(record).await
}
#[inline]
pub fn position(&self) -> &Position {
self.0.position()
}
#[inline]
pub fn is_done(&self) -> bool {
self.0.is_done()
}
#[inline]
pub fn has_headers(&self) -> bool {
self.0.has_headers()
}
#[inline]
pub fn get_ref(&self) -> &R {
self.0.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut R {
self.0.get_mut()
}
#[inline]
pub fn into_inner(self) -> R {
self.0.into_inner()
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io;
use tokio_stream::StreamExt;
use serde::Deserialize;
use tokio::runtime::Runtime;
use crate::byte_record::ByteRecord;
use crate::error::ErrorKind;
use crate::string_record::StringRecord;
use crate::Trim;
use super::{Position, AsyncReaderBuilder, AsyncDeserializer};
fn b(s: &str) -> &[u8] {
s.as_bytes()
}
fn s(b: &[u8]) -> &str {
::std::str::from_utf8(b).unwrap()
}
fn newpos(byte: u64, line: u64, record: u64) -> Position {
let mut p = Position::new();
p.set_byte(byte).set_line(line).set_record(record);
p
}
async fn count(stream: impl StreamExt) -> usize {
stream.fold(0, |acc, _| acc + 1 ).await
}
#[tokio::test]
async fn read_byte_record() {
let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", s(&rec[0]));
assert_eq!("b,ar", s(&rec[1]));
assert_eq!("baz", s(&rec[2]));
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("abc", s(&rec[0]));
assert_eq!("mno", s(&rec[1]));
assert_eq!("xyz", s(&rec[2]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_trimmed_records_and_headers() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::All)
.create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!("1", s(&rec[0]));
assert_eq!("2", s(&rec[1]));
assert_eq!("3", s(&rec[2]));
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!("1", &rec[0]);
assert_eq!("", &rec[1]);
assert_eq!("3", &rec[2]);
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_trimmed_header() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Headers)
.create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(" 1", s(&rec[0]));
assert_eq!(" 2", s(&rec[1]));
assert_eq!(" 3", s(&rec[2]));
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_trimed_header_invalid_utf8() {
let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Headers)
.create_deserializer(data);
let mut rec = StringRecord::new();
let _ = rdr.read_record(&mut rec).await;
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"b\xFFar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
match *rdr.headers().await.unwrap_err().kind() {
ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
assert_eq!(pos, &newpos(0, 1, 0));
assert_eq!(err.field(), 1);
assert_eq!(err.valid_up_to(), 3);
}
ref err => panic!("match failed, got {:?}", err),
}
}
#[tokio::test]
async fn read_trimmed_records() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Fields)
.create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!("1", s(&rec[0]));
assert_eq!("2", s(&rec[1]));
assert_eq!("3", s(&rec[2]));
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!(" bar", &headers[1]);
assert_eq!("\tbaz", &headers[2]);
}
}
#[tokio::test]
async fn read_record_unequal_fails() {
let data = b("foo\nbar,baz");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
match rdr.read_byte_record(&mut rec).await {
Err(err) => match *err.kind() {
ErrorKind::UnequalLengths {
expected_len: 1,
ref pos,
len: 2,
} => {
assert_eq!(pos, &Some(newpos(4, 2, 1)));
}
ref wrong => panic!("match failed, got {:?}", wrong),
},
wrong => panic!("match failed, got {:?}", wrong),
}
}
#[tokio::test]
async fn read_record_unequal_ok() {
let data = b("foo\nbar,baz");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(false)
.flexible(true)
.create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(2, rec.len());
assert_eq!("bar", s(&rec[0]));
assert_eq!("baz", s(&rec[1]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_unequal_continue() {
let data = b("foo\nbar,baz\nquux");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
match rdr.read_byte_record(&mut rec).await {
Err(err) => match err.kind() {
&ErrorKind::UnequalLengths {
expected_len: 1,
ref pos,
len: 2,
} => {
assert_eq!(pos, &Some(newpos(4, 2, 1)));
}
wrong => panic!("match failed, got {:?}", wrong),
},
wrong => panic!("match failed, got {:?}", wrong),
}
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("quux", s(&rec[0]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_headers() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"bar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_record_headers_invalid_utf8() {
let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_deserializer(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"b\xFFar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
match *rdr.headers().await.unwrap_err().kind() {
ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
assert_eq!(pos, &newpos(0, 1, 0));
assert_eq!(err.field(), 1);
assert_eq!(err.valid_up_to(), 1);
}
ref err => panic!("match failed, got {:?}", err),
}
}
#[tokio::test]
async fn read_record_no_headers_before() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
let mut rec = StringRecord::new();
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_no_headers_after() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
struct Row1([String; 3]);
#[tokio::test]
async fn positions_no_headers() {
let mut rdr = AsyncReaderBuilder::new()
.has_headers(false)
.create_deserializer("a,b,c\nx,y,z".as_bytes())
.into_deserialize_with_pos::<Row1>();
let (_, pos) = rdr.next().await.unwrap();
assert_eq!(pos.byte(), 0);
assert_eq!(pos.line(), 1);
assert_eq!(pos.record(), 0);
let (_, pos) = rdr.next().await.unwrap();
assert_eq!(pos.byte(), 6);
assert_eq!(pos.line(), 2);
assert_eq!(pos.record(), 1);
assert!(rdr.next().await.is_none());
assert!(rdr.next().await.is_none());
}
#[tokio::test]
async fn positions_headers() {
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.create_deserializer("a,b,c\nx,y,z".as_bytes())
.into_deserialize_with_pos::<Row1>();
let (_, pos) = rdr.next().await.unwrap();
assert_eq!(pos.byte(), 6);
assert_eq!(pos.line(), 2);
assert_eq!(pos.record(), 1);
}
#[tokio::test]
async fn headers_on_empty_data() {
let mut rdr = AsyncReaderBuilder::new().create_deserializer("".as_bytes());
let r = rdr.byte_headers().await.unwrap();
assert_eq!(r.len(), 0);
}
#[tokio::test]
async fn no_headers_on_empty_data() {
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
}
#[tokio::test]
async fn no_headers_on_empty_data_after_headers() {
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_deserializer("".as_bytes());
assert_eq!(rdr.headers().await.unwrap().len(), 0);
assert_eq!(count(rdr.deserialize::<Row1>()).await, 0);
}
#[test]
fn no_infinite_loop_on_io_errors() {
struct FailingRead;
impl io::AsyncRead for FailingRead {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context,
_buf: &mut tokio::io::ReadBuf
) -> Poll<Result<(), io::Error>> {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
}
}
impl Unpin for FailingRead {}
#[derive(Deserialize)]
struct Fake;
Runtime::new().unwrap().block_on(async {
let mut record_results = AsyncDeserializer::from_reader(FailingRead).into_deserialize::<Fake>();
let first_result = record_results.next().await;
assert!(
matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
);
assert!(record_results.next().await.is_none());
});
}
}