io_stream/coroutines/
write.rs

1use log::{debug, trace};
2
3use crate::{Io, Output};
4
5/// I/O-free coroutine for writing bytes into a stream.
6#[derive(Debug, Default)]
7pub struct Write {
8    bytes: Option<Vec<u8>>,
9}
10
11impl Write {
12    /// Creates a new coroutine to write the given bytes.
13    pub fn new(bytes: impl IntoIterator<Item = u8>) -> Self {
14        let bytes: Vec<u8> = bytes.into_iter().collect();
15        let n = bytes.len();
16        trace!("prepare {n} bytes to be written");
17        let bytes = Some(bytes);
18        Self { bytes }
19    }
20
21    /// Replaces the inner bytes with the given one.
22    pub fn replace(&mut self, bytes: impl IntoIterator<Item = u8>) {
23        *self = Self::new(bytes);
24    }
25
26    /// Adds the given bytes the to inner buffer.
27    pub fn extend(&mut self, more_bytes: impl IntoIterator<Item = u8>) {
28        match &mut self.bytes {
29            Some(bytes) => {
30                let prev_len = bytes.len();
31                bytes.extend(more_bytes);
32                let next_len = bytes.len();
33                let n = next_len - prev_len;
34                trace!("prepare {prev_len}+{n} additional bytes to be written");
35            }
36            None => self.replace(more_bytes),
37        }
38    }
39
40    /// Makes the write progress.
41    pub fn resume(&mut self, arg: Option<Io>) -> Result<Output, Io> {
42        let Some(arg) = arg else {
43            let Some(bytes) = self.bytes.take() else {
44                return Err(Io::err("Write bytes not ready"));
45            };
46
47            trace!("break: need I/O to write bytes");
48            return Err(Io::Write(Err(bytes)));
49        };
50
51        trace!("resume after writting bytes");
52
53        let Io::Write(Ok(output)) = arg else {
54            let msg = format!("Expected write output, got {arg:?}");
55            return Err(Io::err(msg));
56        };
57
58        let n = output.bytes_count;
59        debug!("wrote {n} bytes");
60
61        Ok(output)
62    }
63}