minicbor_io/async_writer.rs
1use crate::Error;
2use futures_io::AsyncWrite;
3use futures_util::AsyncWriteExt;
4use minicbor::Encode;
5use std::io;
6
7/// Wraps an [`AsyncWrite`] and writes length-delimited CBOR values.
8///
9/// *Requires cargo feature* `"async-io"`.
10#[derive(Debug)]
11pub struct AsyncWriter<W> {
12 writer: W,
13 buffer: Vec<u8>,
14 max_len: usize,
15 state: State
16}
17
18/// Write state.
19#[derive(Debug)]
20enum State {
21 /// Nothing is written at the moment.
22 None,
23 /// Writing buffer from offset.
24 WriteFrom(usize)
25}
26
27impl<W> AsyncWriter<W> {
28 /// Create a new writer with a max. buffer size of 512KiB.
29 pub fn new(writer: W) -> Self {
30 Self::with_buffer(writer, Vec::new())
31 }
32
33 /// Create a new writer with a max. buffer size of 512KiB.
34 pub fn with_buffer(writer: W, buffer: Vec<u8>) -> Self {
35 Self { writer, buffer, max_len: 512 * 1024, state: State::None }
36 }
37
38 /// Set the max. buffer size in bytes.
39 ///
40 /// If length values greater than this are encoded, an
41 /// [`Error::InvalidLen`] will be returned.
42 pub fn set_max_len(&mut self, val: u32) {
43 self.max_len = val as usize
44 }
45
46 /// Get a reference to the inner writer.
47 pub fn writer(&self) -> &W {
48 &self.writer
49 }
50
51 /// Get a mutable reference to the inner writer.
52 pub fn writer_mut(&mut self) -> &mut W {
53 &mut self.writer
54 }
55
56 /// Deconstruct this writer into the inner writer and the buffer.
57 pub fn into_parts(self) -> (W, Vec<u8>) {
58 (self.writer, self.buffer)
59 }
60}
61
62impl<W: AsyncWrite + Unpin> AsyncWriter<W> {
63 /// Encode and write a CBOR value and return its size in bytes.
64 ///
65 /// The value will be preceded by a `u32` (4 bytes in network byte order),
66 /// denoting the length of bytes constituting the serialised value.
67 ///
68 /// # Cancellation
69 ///
70 /// If the future returned by `AsyncWriter::write` is dropped while still
71 /// pending, subsequent calls to `AsyncWriter::write` will discard any
72 /// buffered data and instead encode, buffer and commit the new value.
73 /// Cancelling a future thus cancels the transfer. However, it is also
74 /// possible to resume the transfer by calling [`AsyncWriter::sync`]
75 /// after cancellation, which is normally called implicitly by this method.
76 pub async fn write<T: Encode<()>>(&mut self, val: T) -> Result<usize, Error> {
77 self.write_with(val, &mut ()).await
78 }
79
80 /// Like [`AsyncWriter::write`] but accepting a user provided encoding context.
81 pub async fn write_with<C, T: Encode<C>>(&mut self, val: T, ctx: &mut C) -> Result<usize, Error> {
82 self.buffer.resize(4, 0u8);
83 minicbor::encode_with(val, &mut self.buffer, ctx)?;
84 if self.buffer.len() - 4 > self.max_len {
85 return Err(Error::InvalidLen)
86 }
87 let prefix = (self.buffer.len() as u32 - 4).to_be_bytes();
88 self.buffer[.. 4].copy_from_slice(&prefix);
89 self.state = State::WriteFrom(0);
90
91 self.sync().await?;
92
93 Ok(self.buffer.len() - 4)
94 }
95
96 /// Commit any buffered data to the inner `AsyncWrite`.
97 ///
98 /// This method is implicitly called by [`AsyncWriter::write`]. The only
99 /// reason to call it explicitly is to resume the write operation started
100 /// by a previously unfinished, i.e. cancelled, `AsyncWriter::write` call.
101 pub async fn sync(&mut self) -> Result<(), Error> {
102 loop {
103 match self.state {
104 State::None => {
105 return Ok(())
106 }
107 State::WriteFrom(o) if o >= self.buffer.len() => {
108 self.state = State::None;
109 return Ok(())
110 }
111 State::WriteFrom(ref mut o) => {
112 let n = self.writer.write(&self.buffer[*o ..]).await?;
113 if n == 0 {
114 return Err(Error::Io(io::ErrorKind::WriteZero.into()))
115 }
116 *o += n
117 }
118 }
119 }
120 }
121
122 /// Flush the inner `AsyncWrite`.
123 pub async fn flush(&mut self) -> Result<(), Error> {
124 self.writer.flush().await?;
125 Ok(())
126 }
127}