use std::mem;
use log::{debug, trace};
use thiserror::Error;
use crate::io::{StreamIo, StreamOutput};
#[derive(Clone, Debug, Error)]
pub enum ReadStreamError {
#[error("Invalid argument: expected {0}, got {1:?}")]
InvalidArgument(&'static str, StreamIo),
}
#[derive(Clone, Debug)]
pub enum ReadStreamResult {
Ok(StreamOutput),
Io(StreamIo),
Eof,
Err(ReadStreamError),
}
#[derive(Debug)]
pub struct ReadStream {
buffer: Vec<u8>,
}
impl ReadStream {
pub const DEFAULT_CAPACITY: usize = 8 * 1024;
pub fn new() -> Self {
Self::with_capacity(Self::DEFAULT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
trace!("init coroutine to read bytes (capacity: {capacity})");
let buffer = vec![0; capacity];
Self { buffer }
}
pub fn capacity(&self) -> usize {
self.buffer.capacity()
}
pub fn truncate(&mut self, len: usize) {
self.buffer.truncate(len);
self.buffer.shrink_to(len);
}
pub fn replace(&mut self, mut buffer: Vec<u8>) {
buffer.fill(0);
self.buffer = buffer;
}
pub fn resume(&mut self, arg: Option<StreamIo>) -> ReadStreamResult {
let Some(arg) = arg else {
let mut buffer = vec![0; self.buffer.capacity()];
mem::swap(&mut buffer, &mut self.buffer);
trace!("wants I/O to read bytes");
return ReadStreamResult::Io(StreamIo::Read(Err(buffer)));
};
trace!("resume after reading bytes");
let StreamIo::Read(io) = arg else {
return ReadStreamResult::Err(ReadStreamError::InvalidArgument("read output", arg));
};
let output = match io {
Ok(output) => output,
Err(buffer) => return ReadStreamResult::Io(StreamIo::Read(Err(buffer))),
};
match output.bytes_count {
0 => ReadStreamResult::Eof,
n => {
debug!("read {n}/{} bytes", output.buffer.capacity());
ReadStreamResult::Ok(output)
}
}
}
}
impl Default for ReadStream {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::io::{BufReader, Read as _};
use crate::{
coroutines::read::ReadStreamResult,
io::{StreamIo, StreamOutput},
};
use super::ReadStream;
#[test]
fn read() {
let _ = env_logger::try_init();
let mut reader = BufReader::new("abcdef".as_bytes());
let mut read = ReadStream::with_capacity(4);
let mut arg = None;
let output = loop {
match read.resume(arg.take()) {
ReadStreamResult::Ok(output) => break output,
ReadStreamResult::Io(StreamIo::Read(Err(mut buffer))) => {
let bytes_count = reader.read(&mut buffer).unwrap();
let output = StreamOutput {
buffer,
bytes_count,
};
arg = Some(StreamIo::Read(Ok(output)))
}
other => unreachable!("Unexpected result: {other:?}"),
}
};
assert_eq!(output.bytes(), b"abcd");
read.replace(output.buffer);
let output = loop {
match read.resume(arg.take()) {
ReadStreamResult::Ok(output) => break output,
ReadStreamResult::Io(StreamIo::Read(Err(mut buffer))) => {
let bytes_count = reader.read(&mut buffer).unwrap();
let output = StreamOutput {
buffer,
bytes_count,
};
arg = Some(StreamIo::Read(Ok(output)))
}
other => unreachable!("Unexpected result: {other:?}"),
}
};
assert_eq!(output.bytes(), b"ef");
read.replace(output.buffer);
loop {
match read.resume(arg.take()) {
ReadStreamResult::Eof => break,
ReadStreamResult::Io(StreamIo::Read(Err(mut buffer))) => {
let bytes_count = reader.read(&mut buffer).unwrap();
let output = StreamOutput {
buffer,
bytes_count,
};
arg = Some(StreamIo::Read(Ok(output)))
}
other => unreachable!("Unexpected result: {other:?}"),
}
}
}
}