use std::{
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use byteorder::{NetworkEndian, WriteBytesExt};
use futures_core::ready;
use futures_sink::Sink;
use prost::Message;
use tokio::io::AsyncWrite;
use crate::{AsyncDestination, AsyncFrameDestination, Framed, SyncDestination};
#[derive(Debug)]
pub struct AsyncProstWriter<W, T, D> {
writer: W,
pub(crate) written: usize,
pub(crate) buffer: Vec<u8>,
from: PhantomData<T>,
dest: PhantomData<D>,
}
impl<W, T, D> AsyncProstWriter<W, T, D> {
pub fn new(writer: W) -> Self {
Self {
writer,
written: 0,
buffer: Vec::new(),
from: PhantomData,
dest: PhantomData,
}
}
pub fn get_ref(&self) -> &W {
&self.writer
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
pub fn into_inner(self) -> W {
self.writer
}
pub(crate) fn make_for<D2>(self) -> AsyncProstWriter<W, T, D2> {
AsyncProstWriter {
buffer: self.buffer,
writer: self.writer,
written: self.written,
from: self.from,
dest: PhantomData,
}
}
}
impl<W, T, D> Unpin for AsyncProstWriter<W, T, D> {}
impl<W, T> Default for AsyncProstWriter<W, T, SyncDestination>
where
W: Default,
{
fn default() -> Self {
Self::from(W::default())
}
}
impl<W, T> From<W> for AsyncProstWriter<W, T, SyncDestination> {
fn from(writer: W) -> Self {
Self::new(writer)
}
}
impl<W, T> AsyncProstWriter<W, T, SyncDestination> {
pub fn for_async(self) -> AsyncProstWriter<W, T, AsyncDestination> {
self.make_for()
}
pub fn for_async_framed(self) -> AsyncProstWriter<W, T, AsyncFrameDestination> {
self.make_for()
}
}
#[doc(hidden)]
pub trait ProstWriterFor<T> {
fn append(&mut self, item: T) -> Result<(), io::Error>;
}
impl<W, F: Framed> ProstWriterFor<F> for AsyncProstWriter<W, F, AsyncFrameDestination> {
fn append(&mut self, item: F) -> Result<(), io::Error> {
let size = item.encoded_len();
self.buffer.write_u32::<NetworkEndian>(size)?;
item.encode(&mut self.buffer)?;
Ok(())
}
}
impl<W, T: Message> ProstWriterFor<T> for AsyncProstWriter<W, T, AsyncDestination> {
fn append(&mut self, item: T) -> Result<(), io::Error> {
let size = item.encoded_len() as u32;
self.buffer.write_u32::<NetworkEndian>(size)?;
item.encode(&mut self.buffer)?;
Ok(())
}
}
impl<W, T> ProstWriterFor<T> for AsyncProstWriter<W, T, SyncDestination>
where
T: Message,
{
fn append(&mut self, item: T) -> Result<(), io::Error> {
item.encode(&mut self.buffer)?;
Ok(())
}
}
impl<W, T, D> Sink<T> for AsyncProstWriter<W, T, D>
where
W: AsyncWrite + Unpin,
Self: ProstWriterFor<T>,
{
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
if self.buffer.is_empty() {
}
self.append(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
while this.written != this.buffer.len() {
let n =
ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buffer[this.written..]))?;
this.written += n;
}
this.buffer.clear();
this.written = 0;
Pin::new(&mut this.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_flush(cx))?;
Pin::new(&mut self.writer).poll_shutdown(cx)
}
}