use std::result;
use tokio::io::{self, AsyncWrite, AsyncWriteExt};
use serde::Serialize;
use crate::AsyncWriterBuilder;
use crate::error::{IntoInnerError, Result};
use super::mwtr_serde::MemWriter;
impl AsyncWriterBuilder {
pub fn create_serializer<W: AsyncWrite + Unpin>(&self, wtr: W) -> AsyncSerializer<W> {
AsyncSerializer::new(self, wtr)
}
}
#[derive(Debug)]
pub struct AsyncSerializer<W: AsyncWrite + Unpin> {
ser_wtr: MemWriter,
asy_wtr: Option<W>,
}
impl<W: AsyncWrite + Unpin> Drop for AsyncSerializer<W> {
fn drop(&mut self) {
let _ = futures::executor::block_on(self.flush());
}
}
impl<W: AsyncWrite + Unpin> AsyncSerializer<W> {
fn new(builder: &AsyncWriterBuilder, wtr: W) -> Self {
AsyncSerializer {
ser_wtr: MemWriter::new(builder),
asy_wtr: Some(wtr),
}
}
pub fn from_writer(wtr: W) -> AsyncSerializer<W> {
AsyncWriterBuilder::new().create_serializer(wtr)
}
pub async fn serialize<S: Serialize>(&mut self, record: S) -> Result<()> {
self.ser_wtr.serialize(record)?;
self.ser_wtr.flush()?;
self.asy_wtr.as_mut().unwrap().write_all(self.ser_wtr.data()).await?;
self.ser_wtr.clear();
Ok(())
}
pub async fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut asy_wtr) = self.asy_wtr {
asy_wtr.flush().await?;
}
Ok(())
}
pub async fn into_inner(
mut self,
) -> result::Result<W, IntoInnerError<AsyncSerializer<W>>> {
match self.flush().await {
Ok(()) => Ok(self.asy_wtr.take().unwrap()),
Err(err) => Err(IntoInnerError::new(self, err)),
}
}
}