io_stream/coroutines/write.rs
1//! I/O-free coroutine to write bytes into a stream.
2
3use log::{debug, trace};
4use thiserror::Error;
5
6use crate::io::{StreamIo, StreamOutput};
7
8/// Errors that can occur during the coroutine progression.
9#[derive(Clone, Debug, Error)]
10pub enum WriteStreamError {
11 /// The coroutine received an invalid argument.
12 ///
13 /// Occurs when the coroutine receives an I/O response from
14 /// another coroutine, which should not happen if the runtime maps
15 /// correctly the arguments.
16 #[error("Invalid argument: expected {0}, got {1:?}")]
17 InvalidArgument(&'static str, StreamIo),
18}
19
20/// Output emitted after a coroutine finishes its progression.
21#[derive(Clone, Debug)]
22pub enum WriteStreamResult {
23 /// The coroutine has successfully terminated its progression.
24 Ok(StreamOutput),
25
26 /// A stream I/O needs to be performed to make the coroutine
27 /// progress.
28 Io(StreamIo),
29
30 /// The coroutine reached the End Of File.
31 ///
32 /// Only the consumer can determine if its an error or not.
33 Eof,
34
35 /// An error occured during the coroutine progression.
36 Err(WriteStreamError),
37}
38
39/// I/O-free coroutine to write bytes into a stream.
40#[derive(Debug, Default)]
41pub struct WriteStream {
42 bytes: Vec<u8>,
43}
44
45impl WriteStream {
46 /// Creates a new coroutine to write the given bytes.
47 pub fn new(bytes: Vec<u8>) -> Self {
48 trace!("init coroutine for writing {} bytes", bytes.len());
49 Self { bytes }
50 }
51
52 // /// Replaces the inner bytes with the given one.
53 // pub fn replace(&mut self, bytes: impl IntoIterator<Item = u8>) {
54 // *self = Self::new(bytes.into_iter()collect());
55 // }
56
57 // /// Adds the given bytes the to inner buffer.
58 // pub fn extend(&mut self, more_bytes: impl IntoIterator<Item = u8>) {
59 // match &mut self.bytes {
60 // Some(bytes) => {
61 // let prev_len = bytes.len();
62 // bytes.extend(more_bytes);
63 // let next_len = bytes.len();
64 // let n = next_len - prev_len;
65 // trace!("prepare {prev_len}+{n} additional bytes to be written");
66 // }
67 // None => self.replace(more_bytes),
68 // }
69 // }
70
71 /// Makes the write progress.
72 pub fn resume(&mut self, arg: Option<StreamIo>) -> WriteStreamResult {
73 let Some(arg) = arg else {
74 let bytes = self.bytes.drain(..).collect();
75 trace!("wants I/O to write bytes");
76 return WriteStreamResult::Io(StreamIo::Write(Err(bytes)));
77 };
78
79 trace!("resume after writing bytes");
80
81 let StreamIo::Write(io) = arg else {
82 return WriteStreamResult::Err(WriteStreamError::InvalidArgument("write output", arg));
83 };
84
85 let output = match io {
86 Ok(output) => output,
87 Err(bytes) => return WriteStreamResult::Io(StreamIo::Write(Err(bytes))),
88 };
89
90 match output.bytes_count {
91 0 => WriteStreamResult::Eof,
92 n => {
93 debug!("wrote {n} bytes");
94 WriteStreamResult::Ok(output)
95 }
96 }
97 }
98}