extern crate std;
use crate::{
prelude::{vec, Bytes, String, Vec},
StdioConfig, StdioError, StdioSystem, System,
};
use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, OutputPort};
use protoflow_derive::Block;
use simple_mermaid::mermaid;
#[doc = mermaid!("../../../doc/sys/read_file.mmd")]
#[doc = mermaid!("../../../doc/sys/read_file.seq.mmd" framed)]
#[derive(Block, Clone)]
pub struct ReadFile {
#[input]
pub path: InputPort<String>,
#[output]
pub output: OutputPort<Bytes>,
}
impl ReadFile {
pub fn new(path: InputPort<String>, output: OutputPort<Bytes>) -> Self {
Self { path, output }
}
pub fn with_system(system: &System) -> Self {
use crate::SystemBuilding;
Self::new(system.input(), system.output())
}
}
impl Block for ReadFile {
fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
use std::io::prelude::Read;
while let Some(path) = self.path.recv()? {
let mut file = std::fs::OpenOptions::new().read(true).open(path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let bytes = Bytes::from(buffer);
self.output.send(&bytes)?;
}
Ok(())
}
}
#[cfg(feature = "std")]
impl StdioSystem for ReadFile {
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
config.allow_only(vec!["path"])?;
Ok(System::build(|_s| todo!())) }
}
#[cfg(test)]
mod tests {
extern crate std;
use super::ReadFile;
use crate::{System, SystemBuilding, SystemExecution};
#[test]
fn instantiate_block() {
let _ = System::build(|s| {
let _ = s.block(ReadFile::new(s.input(), s.output()));
});
}
#[test]
fn run_block() {
use protoflow_core::{
runtimes::StdRuntime as Runtime, transports::MpscTransport as Transport,
};
use std::io::Write;
let mut temp_file = tempfile::NamedTempFile::new().unwrap();
let test_content = "Hello, World!\n";
temp_file.write_all(test_content.as_bytes()).unwrap();
let mut system = System::new(&Runtime::new(Transport::new()).unwrap());
let read_file = system.block(ReadFile::with_system(&system));
let mut path = system.output();
let output = system.input();
system.connect(&path, &read_file.path);
system.connect(&read_file.output, &output);
let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap());
path.send(&temp_file.path().to_string_lossy().into())
.unwrap();
assert_eq!(
output
.recv()
.expect("should receive output")
.expect("output shouldn't be None"),
test_content
);
path.close().unwrap();
assert_eq!(
output.recv(),
Ok(None),
"want EOS signal after path port is closed"
);
thrd.join().unwrap()
}
}