use bytes::{Buf, BufMut, Bytes, BytesMut};
use chunked_bytes::ChunkedBytes;
use futures::prelude::*;
use futures::ready;
use futures::Sink;
use pin_project::pin_project;
use tokio::io::{self, AsyncWrite, Error};
use tokio::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};
const MESSAGE_MAGIC: &[u8] = b"mess";
pub struct Message {
int_field: u32,
string_field: String,
blob_field: Bytes,
}
fn encode_message(buf: &mut ChunkedBytes, msg: Message) {
buf.put_slice(MESSAGE_MAGIC);
buf.put_u32(1);
buf.put_u32(msg.int_field);
buf.put_u32(2);
buf.put_u64(msg.string_field.len() as u64);
buf.put(msg.string_field.as_bytes());
buf.put_u32(3);
buf.put_u64(msg.blob_field.len() as u64);
buf.put_bytes(msg.blob_field);
}
#[pin_project]
pub struct EncodingWriter<T> {
#[pin]
out: T,
buf: ChunkedBytes,
}
impl<T: AsyncWrite> EncodingWriter<T> {
pub fn new(out: T) -> Self {
EncodingWriter {
buf: ChunkedBytes::new(),
out,
}
}
fn poll_flush_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>> {
let mut this = self.project();
while this.buf.has_remaining() {
ready!(this.out.as_mut().poll_write_buf(cx, this.buf))?;
}
Poll::Ready(Ok(()))
}
}
impl<T: AsyncWrite> Sink<Message> for EncodingWriter<T> {
type Error = Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>> {
let mut this = self.project();
while this.buf.remaining() >= this.buf.chunk_size_hint() {
ready!(this.out.as_mut().poll_write_buf(cx, this.buf))?;
}
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<(), Error> {
let this = self.project();
encode_message(this.buf, msg);
Ok(())
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.project().out.poll_flush(cx)
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Error>> {
ready!(self.as_mut().poll_flush_buf(cx))?;
self.project().out.poll_shutdown(cx)
}
}
#[tokio::main]
async fn main() {
let mut blob = BytesMut::with_capacity(8000);
io::repeat(b'\xa5').read_buf(&mut blob).await.unwrap();
let msg = Message {
int_field: 42,
string_field: "Hello, world!".into(),
blob_field: blob.freeze(),
};
let sink = io::sink();
let mut writer = EncodingWriter::new(sink);
writer.send(msg).await.unwrap();
}