use byteorder::{NetworkEndian, WriteBytesExt};
use futures_core::ready;
use futures_sink::Sink;
use serde::Serialize;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::AsyncWrite;
#[derive(Debug)]
pub struct AsyncBincodeWriter<W, T, D> {
writer: W,
pub(crate) written: usize,
pub(crate) buffer: Vec<u8>,
from: PhantomData<T>,
dest: PhantomData<D>,
}
impl<W, T, D> Unpin for AsyncBincodeWriter<W, T, D> where W: Unpin {}
impl<W, T> Default for AsyncBincodeWriter<W, T, SyncDestination>
where
W: Default,
{
fn default() -> Self {
Self::from(W::default())
}
}
impl<W, T, D> AsyncBincodeWriter<W, T, D> {
pub fn get_ref(&self) -> &W {
&self.writer
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
pub fn into_inner(self) -> W {
self.writer
}
}
impl<W, T> From<W> for AsyncBincodeWriter<W, T, SyncDestination> {
fn from(writer: W) -> Self {
AsyncBincodeWriter {
buffer: Vec::new(),
writer,
written: 0,
from: PhantomData,
dest: PhantomData,
}
}
}
impl<W, T> AsyncBincodeWriter<W, T, SyncDestination> {
pub fn for_async(self) -> AsyncBincodeWriter<W, T, AsyncDestination> {
self.make_for()
}
}
impl<W, T, D> AsyncBincodeWriter<W, T, D> {
pub(crate) fn make_for<D2>(self) -> AsyncBincodeWriter<W, T, D2> {
AsyncBincodeWriter {
buffer: self.buffer,
writer: self.writer,
written: self.written,
from: self.from,
dest: PhantomData,
}
}
}
impl<W, T> AsyncBincodeWriter<W, T, AsyncDestination> {
pub fn for_sync(self) -> AsyncBincodeWriter<W, T, SyncDestination> {
self.make_for()
}
}
#[derive(Debug)]
pub struct AsyncDestination;
#[derive(Debug)]
pub struct SyncDestination;
#[doc(hidden)]
pub trait BincodeWriterFor<T> {
fn append(&mut self, item: T) -> Result<(), bincode::Error>;
}
impl<W, T> BincodeWriterFor<T> for AsyncBincodeWriter<W, T, AsyncDestination>
where
T: Serialize,
{
fn append(&mut self, item: T) -> Result<(), bincode::Error> {
let mut c = bincode::config();
let c = c.limit(u32::max_value() as u64);
let size = c.serialized_size(&item)? as u32;
self.buffer.write_u32::<NetworkEndian>(size)?;
c.serialize_into(&mut self.buffer, &item)
}
}
impl<W, T> BincodeWriterFor<T> for AsyncBincodeWriter<W, T, SyncDestination>
where
T: Serialize,
{
fn append(&mut self, item: T) -> Result<(), bincode::Error> {
bincode::serialize_into(&mut self.buffer, &item)
}
}
impl<W, T, D> Sink<T> for AsyncBincodeWriter<W, T, D>
where
T: Serialize,
W: AsyncWrite + Unpin,
Self: BincodeWriterFor<T>,
{
type Error = bincode::Error;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
if self.buffer.is_empty() {
}
self.append(item)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
while this.written != this.buffer.len() {
let n =
ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buffer[this.written..]))?;
this.written += n;
}
this.buffer.clear();
this.written = 0;
Pin::new(&mut this.writer)
.poll_flush(cx)
.map_err(bincode::Error::from)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
Pin::new(&mut self.writer)
.poll_shutdown(cx)
.map_err(bincode::Error::from)
}
}