1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
use bincode;
use byteorder::{NetworkEndian, WriteBytesExt};
use futures_core::ready;
use serde::Serialize;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio;
use tokio::prelude::*;

/// A wrapper around an asynchronous sink that accepts, serializes, and sends bincode-encoded
/// values.
///
/// To use, provide a writer that implements `futures::AsyncWrite`, and then use `futures::Sink`
/// to send values.
///
/// Note that an `AsyncBincodeWriter` must be of the type [`AsyncDestination`] in order to be
/// compatible with an [`AsyncBincodeReader`] on the remote end (recall that it requires the
/// serialized size prefixed to the serialized data). The default is [`SyncDestination`], but these
/// can be easily toggled between using [`AsyncBincodeWriter::for_async`].
#[derive(Debug)]
pub struct AsyncBincodeWriter<W, T, D> {
    writer: W,
    pub(crate) written: usize,
    pub(crate) buffer: Vec<u8>,
    from: PhantomData<T>,
    dest: PhantomData<D>,
}

impl<W, T, D> Unpin for AsyncBincodeWriter<W, T, D> where W: Unpin {}

impl<W, T> Default for AsyncBincodeWriter<W, T, SyncDestination>
where
    W: Default,
{
    fn default() -> Self {
        Self::from(W::default())
    }
}

impl<W, T, D> AsyncBincodeWriter<W, T, D> {
    /// Gets a reference to the underlying writer.
    ///
    /// It is inadvisable to directly write to the underlying writer.
    pub fn get_ref(&self) -> &W {
        &self.writer
    }

    /// Gets a mutable reference to the underlying writer.
    ///
    /// It is inadvisable to directly write to the underlying writer.
    pub fn get_mut(&mut self) -> &mut W {
        &mut self.writer
    }

    /// Unwraps this `AsyncBincodeWriter`, returning the underlying writer.
    ///
    /// Note that any leftover serialized data that has not yet been sent is lost.
    pub fn into_inner(self) -> W {
        self.writer
    }
}

impl<W, T> From<W> for AsyncBincodeWriter<W, T, SyncDestination> {
    fn from(writer: W) -> Self {
        AsyncBincodeWriter {
            buffer: Vec::new(),
            writer,
            written: 0,
            from: PhantomData,
            dest: PhantomData,
        }
    }
}

impl<W, T> AsyncBincodeWriter<W, T, SyncDestination> {
    /// Make this writer include the serialized data's size before each serialized value.
    ///
    /// This is necessary for compatability with [`AsyncBincodeReader`].
    pub fn for_async(self) -> AsyncBincodeWriter<W, T, AsyncDestination> {
        self.make_for()
    }
}

impl<W, T, D> AsyncBincodeWriter<W, T, D> {
    pub(crate) fn make_for<D2>(self) -> AsyncBincodeWriter<W, T, D2> {
        AsyncBincodeWriter {
            buffer: self.buffer,
            writer: self.writer,
            written: self.written,
            from: self.from,
            dest: PhantomData,
        }
    }
}

impl<W, T> AsyncBincodeWriter<W, T, AsyncDestination> {
    /// Make this writer only send bincode-encoded values.
    ///
    /// This is necessary for compatability with stock `bincode` receivers.
    pub fn for_sync(self) -> AsyncBincodeWriter<W, T, SyncDestination> {
        self.make_for()
    }
}

/// A marker that indicates that the wrapping type is compatible with `AsyncBincodeReader`.
#[derive(Debug)]
pub struct AsyncDestination;

/// A marker that indicates that the wrapping type is compatible with stock `bincode` receivers.
#[derive(Debug)]
pub struct SyncDestination;

#[doc(hidden)]
pub trait BincodeWriterFor<T> {
    fn append(&mut self, item: T) -> Result<(), bincode::Error>;
}

impl<W, T> BincodeWriterFor<T> for AsyncBincodeWriter<W, T, AsyncDestination>
where
    T: Serialize,
{
    fn append(&mut self, item: T) -> Result<(), bincode::Error> {
        let mut c = bincode::config();
        let c = c.limit(u32::max_value() as u64);
        let size = c.serialized_size(&item)? as u32;
        self.buffer.write_u32::<NetworkEndian>(size)?;
        c.serialize_into(&mut self.buffer, &item)
    }
}

impl<W, T> BincodeWriterFor<T> for AsyncBincodeWriter<W, T, SyncDestination>
where
    T: Serialize,
{
    fn append(&mut self, item: T) -> Result<(), bincode::Error> {
        bincode::serialize_into(&mut self.buffer, &item)
    }
}

impl<W, T, D> Sink<T> for AsyncBincodeWriter<W, T, D>
where
    T: Serialize,
    W: AsyncWrite + Unpin,
    Self: BincodeWriterFor<T>,
{
    type Error = bincode::Error;

    fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        if self.buffer.is_empty() {
            // NOTE: in theory we could have a short-circuit here that tries to have bincode write
            // directly into self.writer. this would be way more efficient in the common case as we
            // don't have to do the extra buffering. the idea would be to serialize fist, and *if*
            // it errors, see how many bytes were written, serialize again into a Vec, and then
            // keep only the bytes following the number that were written in our buffer.
            // unfortunately, bincode will not tell us that number at the moment, and instead just
            // fail.
        }

        self.append(item)?;
        Ok(())
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // allow us to borrow fields separately
        let this = self.get_mut();

        // write stuff out if we need to
        while this.written != this.buffer.len() {
            let n =
                ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buffer[this.written..]))?;
            this.written += n;
        }

        // we have to flush before we're really done
        this.buffer.clear();
        this.written = 0;
        Pin::new(&mut this.writer)
            .poll_flush(cx)
            .map_err(bincode::Error::from)
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        ready!(self.as_mut().poll_flush(cx))?;
        Pin::new(&mut self.writer)
            .poll_shutdown(cx)
            .map_err(bincode::Error::from)
    }
}