minicbor_io/
async_writer.rs

1use crate::Error;
2use futures_io::AsyncWrite;
3use futures_util::AsyncWriteExt;
4use minicbor::Encode;
5use std::io;
6
7/// Wraps an [`AsyncWrite`] and writes length-delimited CBOR values.
8///
9/// *Requires cargo feature* `"async-io"`.
10#[derive(Debug)]
11pub struct AsyncWriter<W> {
12    writer: W,
13    buffer: Vec<u8>,
14    max_len: usize,
15    state: State
16}
17
18/// Write state.
19#[derive(Debug)]
20enum State {
21    /// Nothing is written at the moment.
22    None,
23    /// Writing buffer from offset.
24    WriteFrom(usize)
25}
26
27impl<W> AsyncWriter<W> {
28    /// Create a new writer with a max. buffer size of 512KiB.
29    pub fn new(writer: W) -> Self {
30        Self::with_buffer(writer, Vec::new())
31    }
32
33    /// Create a new writer with a max. buffer size of 512KiB.
34    pub fn with_buffer(writer: W, buffer: Vec<u8>) -> Self {
35        Self { writer, buffer, max_len: 512 * 1024, state: State::None }
36    }
37
38    /// Set the max. buffer size in bytes.
39    ///
40    /// If length values greater than this are encoded, an
41    /// [`Error::InvalidLen`] will be returned.
42    pub fn set_max_len(&mut self, val: u32) {
43        self.max_len = val as usize
44    }
45
46    /// Get a reference to the inner writer.
47    pub fn writer(&self) -> &W {
48        &self.writer
49    }
50
51    /// Get a mutable reference to the inner writer.
52    pub fn writer_mut(&mut self) -> &mut W {
53        &mut self.writer
54    }
55
56    /// Deconstruct this writer into the inner writer and the buffer.
57    pub fn into_parts(self) -> (W, Vec<u8>) {
58        (self.writer, self.buffer)
59    }
60}
61
62impl<W: AsyncWrite + Unpin> AsyncWriter<W> {
63    /// Encode and write a CBOR value and return its size in bytes.
64    ///
65    /// The value will be preceded by a `u32` (4 bytes in network byte order),
66    /// denoting the length of bytes constituting the serialised value.
67    ///
68    /// # Cancellation
69    ///
70    /// If the future returned by `AsyncWriter::write` is dropped while still
71    /// pending, subsequent calls to `AsyncWriter::write` will discard any
72    /// buffered data and instead encode, buffer and commit the new value.
73    /// Cancelling a future thus cancels the transfer. However, it is also
74    /// possible to resume the transfer by calling [`AsyncWriter::sync`]
75    /// after cancellation, which is normally called implicitly by this method.
76    pub async fn write<T: Encode<()>>(&mut self, val: T) -> Result<usize, Error> {
77        self.write_with(val, &mut ()).await
78    }
79
80    /// Like [`AsyncWriter::write`] but accepting a user provided encoding context.
81    pub async fn write_with<C, T: Encode<C>>(&mut self, val: T, ctx: &mut C) -> Result<usize, Error> {
82        self.buffer.resize(4, 0u8);
83        minicbor::encode_with(val, &mut self.buffer, ctx)?;
84        if self.buffer.len() - 4 > self.max_len {
85            return Err(Error::InvalidLen)
86        }
87        let prefix = (self.buffer.len() as u32 - 4).to_be_bytes();
88        self.buffer[.. 4].copy_from_slice(&prefix);
89        self.state = State::WriteFrom(0);
90
91        self.sync().await?;
92
93        Ok(self.buffer.len() - 4)
94    }
95
96    /// Commit any buffered data to the inner `AsyncWrite`.
97    ///
98    /// This method is implicitly called by [`AsyncWriter::write`]. The only
99    /// reason to call it explicitly is to resume the write operation started
100    /// by a previously unfinished, i.e. cancelled, `AsyncWriter::write` call.
101    pub async fn sync(&mut self) -> Result<(), Error> {
102        loop {
103            match self.state {
104                State::None => {
105                    return Ok(())
106                }
107                State::WriteFrom(o) if o >= self.buffer.len() => {
108                    self.state = State::None;
109                    return Ok(())
110                }
111                State::WriteFrom(ref mut o) => {
112                    let n = self.writer.write(&self.buffer[*o ..]).await?;
113                    if n == 0 {
114                        return Err(Error::Io(io::ErrorKind::WriteZero.into()))
115                    }
116                    *o += n
117                }
118            }
119        }
120    }
121
122    /// Flush the inner `AsyncWrite`.
123    pub async fn flush(&mut self) -> Result<(), Error> {
124        self.writer.flush().await?;
125        Ok(())
126    }
127}