use tokio::io;
use crate::AsyncReaderBuilder;
use crate::byte_record::{ByteRecord, Position};
use crate::error::Result;
use crate::string_record::StringRecord;
use super::{
AsyncReaderImpl,
StringRecordsStream, StringRecordsIntoStream,
ByteRecordsStream, ByteRecordsIntoStream,
};
impl AsyncReaderBuilder {
pub fn create_reader<R: io::AsyncRead + Unpin + Send>(&self, rdr: R) -> AsyncReader<R> {
AsyncReader::new(self, rdr)
}
}
#[derive(Debug)]
pub struct AsyncReader<R>(AsyncReaderImpl<R>);
impl<'r, R> AsyncReader<R>
where
R: io::AsyncRead + Unpin + Send + 'r,
{
fn new(builder: &AsyncReaderBuilder, rdr: R) -> AsyncReader<R> {
AsyncReader(AsyncReaderImpl::new(builder, rdr))
}
#[inline]
pub fn from_reader(rdr: R) -> AsyncReader<R> {
AsyncReaderBuilder::new().create_reader(rdr)
}
#[inline]
pub fn records(&mut self) -> StringRecordsStream<R> {
StringRecordsStream::new(&mut self.0)
}
#[inline]
pub fn into_records(self) -> StringRecordsIntoStream<'r, R> {
StringRecordsIntoStream::new(self.0)
}
#[inline]
pub fn byte_records(&mut self) -> ByteRecordsStream<R> {
ByteRecordsStream::new(&mut self.0)
}
#[inline]
pub fn into_byte_records(self) -> ByteRecordsIntoStream<'r, R> {
ByteRecordsIntoStream::new(self.0)
}
#[inline]
pub async fn headers(&mut self) -> Result<&StringRecord> {
self.0.headers().await
}
#[inline]
pub async fn byte_headers(&mut self) -> Result<&ByteRecord> {
self.0.byte_headers().await
}
#[inline]
pub fn set_headers(&mut self, headers: StringRecord) {
self.0.set_headers(headers);
}
#[inline]
pub fn set_byte_headers(&mut self, headers: ByteRecord) {
self.0.set_byte_headers(headers);
}
#[inline]
pub async fn read_record(&mut self, record: &mut StringRecord) -> Result<bool> {
self.0.read_record(record).await
}
#[inline]
pub async fn read_byte_record(&mut self, record: &mut ByteRecord) -> Result<bool> {
self.0.read_byte_record(record).await
}
#[inline]
pub fn position(&self) -> &Position {
self.0.position()
}
#[inline]
pub fn is_done(&self) -> bool {
self.0.is_done()
}
#[inline]
pub fn has_headers(&self) -> bool {
self.0.has_headers()
}
#[inline]
pub fn get_ref(&self) -> &R {
self.0.get_ref()
}
#[inline]
pub fn get_mut(&mut self) -> &mut R {
self.0.get_mut()
}
#[inline]
pub fn into_inner(self) -> R {
self.0.into_inner()
}
}
impl<R: io::AsyncRead + io::AsyncSeek + std::marker::Unpin> AsyncReader<R> {
#[inline]
pub async fn seek(&mut self, pos: Position) -> Result<()> {
self.0.seek(pos).await
}
#[inline]
pub async fn seek_raw(
&mut self,
seek_from: io::SeekFrom,
pos: Position,
) -> Result<()> {
self.0.seek_raw(seek_from, pos).await
}
#[inline]
pub async fn rewind(&mut self) -> Result<()> {
self.0.rewind().await
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io;
use tokio_stream::StreamExt;
use tokio::runtime::Runtime;
use crate::byte_record::ByteRecord;
use crate::error::ErrorKind;
use crate::string_record::StringRecord;
use crate::Trim;
use super::{Position, AsyncReaderBuilder, AsyncReader};
fn b(s: &str) -> &[u8] {
s.as_bytes()
}
fn s(b: &[u8]) -> &str {
::std::str::from_utf8(b).unwrap()
}
fn newpos(byte: u64, line: u64, record: u64) -> Position {
let mut p = Position::new();
p.set_byte(byte).set_line(line).set_record(record);
p
}
async fn count(stream: impl StreamExt) -> usize {
stream.fold(0, |acc, _| acc + 1 ).await
}
#[tokio::test]
async fn read_byte_record() {
let data = b("foo,\"b,ar\",baz\nabc,mno,xyz");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", s(&rec[0]));
assert_eq!("b,ar", s(&rec[1]));
assert_eq!("baz", s(&rec[2]));
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("abc", s(&rec[0]));
assert_eq!("mno", s(&rec[1]));
assert_eq!("xyz", s(&rec[2]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_trimmed_records_and_headers() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::All)
.create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!("1", s(&rec[0]));
assert_eq!("2", s(&rec[1]));
assert_eq!("3", s(&rec[2]));
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!("1", &rec[0]);
assert_eq!("", &rec[1]);
assert_eq!("3", &rec[2]);
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_trimmed_header() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Headers)
.create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(" 1", s(&rec[0]));
assert_eq!(" 2", s(&rec[1]));
assert_eq!(" 3", s(&rec[2]));
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_trimed_header_invalid_utf8() {
let data = &b"foo, b\xFFar,\tbaz\na,b,c\nd,e,f"[..];
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Headers)
.create_reader(data);
let mut rec = StringRecord::new();
let _ = rdr.read_record(&mut rec).await;
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"b\xFFar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
match *rdr.headers().await.unwrap_err().kind() {
ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
assert_eq!(pos, &newpos(0, 1, 0));
assert_eq!(err.field(), 1);
assert_eq!(err.valid_up_to(), 3);
}
ref err => panic!("match failed, got {:?}", err),
}
}
#[tokio::test]
async fn read_trimmed_records() {
let data = b("foo, bar,\tbaz\n 1, 2, 3\n1\t,\t,3\t\t");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.trim(Trim::Fields)
.create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!("1", s(&rec[0]));
assert_eq!("2", s(&rec[1]));
assert_eq!("3", s(&rec[2]));
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!(" bar", &headers[1]);
assert_eq!("\tbaz", &headers[2]);
}
}
#[tokio::test]
async fn read_record_unequal_fails() {
let data = b("foo\nbar,baz");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
match rdr.read_byte_record(&mut rec).await {
Err(err) => match *err.kind() {
ErrorKind::UnequalLengths {
expected_len: 1,
ref pos,
len: 2,
} => {
assert_eq!(pos, &Some(newpos(4, 2, 1)));
}
ref wrong => panic!("match failed, got {:?}", wrong),
},
wrong => panic!("match failed, got {:?}", wrong),
}
}
#[tokio::test]
async fn read_record_unequal_ok() {
let data = b("foo\nbar,baz");
let mut rdr = AsyncReaderBuilder::new()
.has_headers(false)
.flexible(true)
.create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(2, rec.len());
assert_eq!("bar", s(&rec[0]));
assert_eq!("baz", s(&rec[1]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_unequal_continue() {
let data = b("foo\nbar,baz\nquux");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader(data);
let mut rec = ByteRecord::new();
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("foo", s(&rec[0]));
match rdr.read_byte_record(&mut rec).await {
Err(err) => match err.kind() {
&ErrorKind::UnequalLengths {
expected_len: 1,
ref pos,
len: 2,
} => {
assert_eq!(pos, &Some(newpos(4, 2, 1)));
}
wrong => panic!("match failed, got {:?}", wrong),
},
wrong => panic!("match failed, got {:?}", wrong),
}
assert!(rdr.read_byte_record(&mut rec).await.unwrap());
assert_eq!(1, rec.len());
assert_eq!("quux", s(&rec[0]));
assert!(!rdr.read_byte_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_headers() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"bar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
}
#[tokio::test]
async fn read_record_headers_invalid_utf8() {
let data = &b"foo,b\xFFar,baz\na,b,c\nd,e,f"[..];
let mut rdr = AsyncReaderBuilder::new().has_headers(true).create_reader(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
{
let headers = rdr.byte_headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!(b"foo", &headers[0]);
assert_eq!(b"b\xFFar", &headers[1]);
assert_eq!(b"baz", &headers[2]);
}
match *rdr.headers().await.unwrap_err().kind() {
ErrorKind::Utf8 { pos: Some(ref pos), ref err } => {
assert_eq!(pos, &newpos(0, 1, 0));
assert_eq!(err.field(), 1);
assert_eq!(err.valid_up_to(), 1);
}
ref err => panic!("match failed, got {:?}", err),
}
}
#[tokio::test]
async fn read_record_no_headers_before() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader(data);
let mut rec = StringRecord::new();
{
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn read_record_no_headers_after() {
let data = b("foo,bar,baz\na,b,c\nd,e,f");
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader(data);
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("foo", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
let headers = rdr.headers().await.unwrap();
assert_eq!(3, headers.len());
assert_eq!("foo", &headers[0]);
assert_eq!("bar", &headers[1]);
assert_eq!("baz", &headers[2]);
}
#[tokio::test]
async fn seek() {
let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
rdr.seek(newpos(18, 3, 2)).await.unwrap();
let mut rec = StringRecord::new();
assert_eq!(18, rdr.position().byte());
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
assert_eq!(24, rdr.position().byte());
assert_eq!(4, rdr.position().line());
assert_eq!(3, rdr.position().record());
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("g", &rec[0]);
assert!(!rdr.read_record(&mut rec).await.unwrap());
}
#[tokio::test]
async fn seek_headers_after() {
let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
rdr.seek(newpos(18, 3, 2)).await.unwrap();
assert_eq!(rdr.headers().await.unwrap(), vec!["foo", "bar", "baz"]);
}
#[tokio::test]
async fn seek_headers_before_after() {
let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
let headers = rdr.headers().await.unwrap().clone();
rdr.seek(newpos(18, 3, 2)).await.unwrap();
assert_eq!(&headers, rdr.headers().await.unwrap());
}
#[tokio::test]
async fn seek_headers_no_actual_seek() {
let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
rdr.seek(Position::new()).await.unwrap();
assert_eq!("foo", &rdr.headers().await.unwrap()[0]);
}
#[tokio::test]
async fn rewind() {
let data = b("foo,bar,baz\na,b,c\nd,e,f\ng,h,i");
let mut rdr = AsyncReaderBuilder::new().create_reader(std::io::Cursor::new(data));
let mut rec = StringRecord::new();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("d", &rec[0]);
rdr.rewind().await.unwrap();
assert!(rdr.read_record(&mut rec).await.unwrap());
assert_eq!(3, rec.len());
assert_eq!("a", &rec[0]);
}
#[tokio::test]
async fn positions_no_headers() {
let mut rdr = AsyncReaderBuilder::new()
.has_headers(false)
.create_reader("a,b,c\nx,y,z".as_bytes())
.into_records();
let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
assert_eq!(pos.byte(), 0);
assert_eq!(pos.line(), 1);
assert_eq!(pos.record(), 0);
let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
assert_eq!(pos.byte(), 6);
assert_eq!(pos.line(), 2);
assert_eq!(pos.record(), 1);
assert!(rdr.next().await.is_none());
assert!(rdr.next().await.is_none());
}
#[tokio::test]
async fn positions_headers() {
let mut rdr = AsyncReaderBuilder::new()
.has_headers(true)
.create_reader("a,b,c\nx,y,z".as_bytes())
.into_records();
let pos = rdr.next().await.unwrap().unwrap().position().unwrap().clone();
assert_eq!(pos.byte(), 6);
assert_eq!(pos.line(), 2);
assert_eq!(pos.record(), 1);
}
#[tokio::test]
async fn headers_on_empty_data() {
let mut rdr = AsyncReaderBuilder::new().create_reader("".as_bytes());
let r = rdr.byte_headers().await.unwrap();
assert_eq!(r.len(), 0);
}
#[tokio::test]
async fn no_headers_on_empty_data() {
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
assert_eq!(count(rdr.records()).await, 0);
}
#[tokio::test]
async fn no_headers_on_empty_data_after_headers() {
let mut rdr =
AsyncReaderBuilder::new().has_headers(false).create_reader("".as_bytes());
assert_eq!(rdr.headers().await.unwrap().len(), 0);
assert_eq!(count(rdr.records()).await, 0);
}
#[test]
fn no_infinite_loop_on_io_errors() {
struct FailingRead;
impl io::AsyncRead for FailingRead {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context,
_buf: &mut tokio::io::ReadBuf
) -> Poll<Result<(), io::Error>> {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Broken reader")))
}
}
impl Unpin for FailingRead {}
Runtime::new().unwrap().block_on(async {
let mut record_results = AsyncReader::from_reader(FailingRead).into_records();
let first_result = record_results.next().await;
assert!(
matches!(&first_result, Some(Err(e)) if matches!(e.kind(), crate::ErrorKind::Io(_)))
);
assert!(record_results.next().await.is_none());
});
}
}