io_stream/coroutines/
write.rs

1//! I/O-free coroutine to write bytes into a stream.
2
3use log::{debug, trace};
4use thiserror::Error;
5
6use crate::io::{StreamIo, StreamOutput};
7
8/// Errors that can occur during the coroutine progression.
9#[derive(Clone, Debug, Error)]
10pub enum WriteStreamError {
11    /// The coroutine received an invalid argument.
12    ///
13    /// Occurs when the coroutine receives an I/O response from
14    /// another coroutine, which should not happen if the runtime maps
15    /// correctly the arguments.
16    #[error("Invalid argument: expected {0}, got {1:?}")]
17    InvalidArgument(&'static str, StreamIo),
18}
19
20/// Output emitted after a coroutine finishes its progression.
21#[derive(Clone, Debug)]
22pub enum WriteStreamResult {
23    /// The coroutine has successfully terminated its progression.
24    Ok(StreamOutput),
25
26    /// A stream I/O needs to be performed to make the coroutine
27    /// progress.
28    Io(StreamIo),
29
30    /// The coroutine reached the End Of File.
31    ///
32    /// Only the consumer can determine if its an error or not.
33    Eof,
34
35    /// An error occured during the coroutine progression.
36    Err(WriteStreamError),
37}
38
39/// I/O-free coroutine to write bytes into a stream.
40#[derive(Debug, Default)]
41pub struct WriteStream {
42    bytes: Vec<u8>,
43}
44
45impl WriteStream {
46    /// Creates a new coroutine to write the given bytes.
47    pub fn new(bytes: Vec<u8>) -> Self {
48        trace!("init coroutine for writing {} bytes", bytes.len());
49        Self { bytes }
50    }
51
52    // /// Replaces the inner bytes with the given one.
53    // pub fn replace(&mut self, bytes: impl IntoIterator<Item = u8>) {
54    //     *self = Self::new(bytes.into_iter()collect());
55    // }
56
57    // /// Adds the given bytes the to inner buffer.
58    // pub fn extend(&mut self, more_bytes: impl IntoIterator<Item = u8>) {
59    //     match &mut self.bytes {
60    //         Some(bytes) => {
61    //             let prev_len = bytes.len();
62    //             bytes.extend(more_bytes);
63    //             let next_len = bytes.len();
64    //             let n = next_len - prev_len;
65    //             trace!("prepare {prev_len}+{n} additional bytes to be written");
66    //         }
67    //         None => self.replace(more_bytes),
68    //     }
69    // }
70
71    /// Makes the write progress.
72    pub fn resume(&mut self, arg: Option<StreamIo>) -> WriteStreamResult {
73        let Some(arg) = arg else {
74            let bytes = self.bytes.drain(..).collect();
75            trace!("wants I/O to write bytes");
76            return WriteStreamResult::Io(StreamIo::Write(Err(bytes)));
77        };
78
79        trace!("resume after writing bytes");
80
81        let StreamIo::Write(io) = arg else {
82            return WriteStreamResult::Err(WriteStreamError::InvalidArgument("write output", arg));
83        };
84
85        let output = match io {
86            Ok(output) => output,
87            Err(bytes) => return WriteStreamResult::Io(StreamIo::Write(Err(bytes))),
88        };
89
90        match output.bytes_count {
91            0 => WriteStreamResult::Eof,
92            n => {
93                debug!("wrote {n} bytes");
94                WriteStreamResult::Ok(output)
95            }
96        }
97    }
98}