use std::result;
use futures::io::{self, AsyncWrite};
use crate::AsyncWriterBuilder;
use crate::byte_record::ByteRecord;
use crate::error::Result;
use super::AsyncWriterImpl;
impl AsyncWriterBuilder {
pub fn create_writer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncWriter<W> {
AsyncWriter::new(self, wtr)
}
#[deprecated(
since = "1.0.1",
note = "Please use AsyncWriterBuilder::create_writer function instead"
)]
pub fn from_writer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncWriter<W> {
AsyncWriter::new(self, wtr)
}
}
#[derive(Debug)]
pub struct AsyncWriter<W: AsyncWrite + Unpin>(AsyncWriterImpl<W>);
impl<W: AsyncWrite + Unpin> AsyncWriter<W> {
fn new(builder: &AsyncWriterBuilder, wtr: W) -> AsyncWriter<W> {
AsyncWriter(AsyncWriterImpl::new(builder, wtr))
}
pub fn from_writer(wtr: W) -> AsyncWriter<W> {
AsyncWriterBuilder::new().create_writer(wtr)
}
#[inline]
pub async fn write_record<I, T>(&mut self, record: I) -> Result<()>
where
I: IntoIterator<Item = T>,
T: AsRef<[u8]>,
{
self.0.write_record(record).await
}
#[inline]
pub async fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
self.0.write_byte_record(record).await
}
#[inline]
pub async fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
self.0.write_field(field).await
}
#[inline]
pub async fn flush(&mut self) -> io::Result<()> {
self.0.flush().await
}
pub async fn into_inner(
self,
) -> result::Result<W, io::Error> {
match self.0.into_inner().await {
Ok(w) => Ok(w),
Err(err) => Err(err.into_error()),
}
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::io;
use async_std::task;
use crate::byte_record::ByteRecord;
use crate::error::ErrorKind;
use crate::string_record::StringRecord;
use super::{AsyncWriter, AsyncWriterBuilder};
async fn wtr_as_string<'w>(wtr: AsyncWriter<Vec<u8>>) -> String {
String::from_utf8(wtr.into_inner().await.unwrap()).unwrap()
}
#[test]
fn one_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&["a", "b", "c"]).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
});
}
#[test]
fn one_string_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&StringRecord::from(vec!["a", "b", "c"])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
});
}
#[test]
fn one_byte_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
});
}
#[test]
fn raw_one_byte_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\n");
});
}
#[test]
fn one_empty_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&[""]).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
});
}
#[test]
fn raw_one_empty_record() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "\"\"\n");
});
}
#[test]
fn two_empty_records() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&[""]).await.unwrap();
wtr.write_record(&[""]).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
});
}
#[test]
fn raw_two_empty_records() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
wtr.write_byte_record(&ByteRecord::from(vec![""])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "\"\"\n\"\"\n");
});
}
#[test]
fn unequal_records_bad() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
let err = wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
match *err.kind() {
ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
assert!(pos.is_none());
assert_eq!(expected_len, 3);
assert_eq!(len, 1);
}
ref x => {
panic!("expected UnequalLengths error, but got '{:?}'", x);
}
}
});
}
#[test]
fn raw_unequal_records_bad() {
task::block_on(async {
let mut wtr = AsyncWriter::from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
let err =
wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap_err();
match *err.kind() {
ErrorKind::UnequalLengths { ref pos, expected_len, len } => {
assert!(pos.is_none());
assert_eq!(expected_len, 3);
assert_eq!(len, 1);
}
ref x => {
panic!("expected UnequalLengths error, but got '{:?}'", x);
}
}
});
}
#[test]
fn unequal_records_ok() {
task::block_on(async {
let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
wtr.write_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
wtr.write_record(&ByteRecord::from(vec!["a"])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
});
}
#[test]
fn raw_unequal_records_ok() {
task::block_on(async {
let mut wtr = AsyncWriterBuilder::new().flexible(true).create_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).await.unwrap();
wtr.write_byte_record(&ByteRecord::from(vec!["a"])).await.unwrap();
assert_eq!(wtr_as_string(wtr).await, "a,b,c\na\n");
});
}
#[test]
fn full_buffer_should_not_flush_underlying() {
task::block_on(async {
#[derive(Debug)]
struct MarkWriteAndFlush(Vec<u8>);
impl MarkWriteAndFlush {
fn to_str(self) -> String {
String::from_utf8(self.0).unwrap()
}
}
impl io::AsyncWrite for MarkWriteAndFlush {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context,
buf: &[u8]
) -> Poll<Result<usize, io::Error>> {
use std::io::Write;
self.0.write(b">").unwrap();
let written = self.0.write(buf).unwrap();
assert_eq!(written, buf.len());
self.0.write(b"<").unwrap();
Poll::Ready(Ok(written))
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), io::Error>> {
use std::io::Write;
self.0.write(b"!").unwrap();
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), io::Error>> {
self.poll_flush(cx)
}
}
let underlying = MarkWriteAndFlush(vec![]);
let mut wtr =
AsyncWriterBuilder::new().buffer_capacity(4).create_writer(underlying);
wtr.write_byte_record(&ByteRecord::from(vec!["a", "b"])).await.unwrap();
wtr.write_byte_record(&ByteRecord::from(vec!["c", "d"])).await.unwrap();
wtr.flush().await.unwrap();
wtr.write_byte_record(&ByteRecord::from(vec!["e", "f"])).await.unwrap();
let got = wtr.into_inner().await.unwrap().to_str();
assert_eq!(got, ">a,b\n<>c,d\n<!>e,f\n<!");
});
}
}