protoflow_blocks/blocks/sys/
read_file.rs1extern crate std;
4
5use crate::{
6 prelude::{vec, Bytes, String, Vec},
7 StdioConfig, StdioError, StdioSystem, System,
8};
9use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, OutputPort};
10use protoflow_derive::Block;
11use simple_mermaid::mermaid;
12
13#[doc = mermaid!("../../../doc/sys/read_file.mmd")]
17#[doc = mermaid!("../../../doc/sys/read_file.seq.mmd" framed)]
20#[derive(Block, Clone)]
41pub struct ReadFile {
42 #[input]
44 pub path: InputPort<String>,
45
46 #[output]
48 pub output: OutputPort<Bytes>,
49}
50
51impl ReadFile {
52 pub fn new(path: InputPort<String>, output: OutputPort<Bytes>) -> Self {
53 Self { path, output }
54 }
55
56 pub fn with_system(system: &System) -> Self {
57 use crate::SystemBuilding;
58 Self::new(system.input(), system.output())
59 }
60}
61
62impl Block for ReadFile {
63 fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
64 use std::io::prelude::Read;
65
66 while let Some(path) = self.path.recv()? {
67 let mut file = std::fs::OpenOptions::new().read(true).open(path)?;
68
69 let mut buffer = Vec::new();
70 file.read_to_end(&mut buffer)?;
71 let bytes = Bytes::from(buffer);
72
73 self.output.send(&bytes)?;
74 }
75
76 Ok(())
77 }
78}
79
80#[cfg(feature = "std")]
81impl StdioSystem for ReadFile {
82 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
83 config.allow_only(vec!["path"])?;
86
87 Ok(System::build(|_s| todo!())) }
89}
90
91#[cfg(test)]
92mod tests {
93 extern crate std;
94 use super::ReadFile;
95 use crate::{System, SystemBuilding, SystemExecution};
96
97 #[test]
98 fn instantiate_block() {
99 let _ = System::build(|s| {
101 let _ = s.block(ReadFile::new(s.input(), s.output()));
102 });
103 }
104
105 #[test]
106 fn run_block() {
107 use protoflow_core::{
108 runtimes::StdRuntime as Runtime, transports::MpscTransport as Transport,
109 };
110 use std::io::Write;
111
112 let mut temp_file = tempfile::NamedTempFile::new().unwrap();
113 let test_content = "Hello, World!\n";
114 temp_file.write_all(test_content.as_bytes()).unwrap();
115
116 let mut system = System::new(&Runtime::new(Transport::new()).unwrap());
117 let read_file = system.block(ReadFile::with_system(&system));
118
119 let mut path = system.output();
120 let output = system.input();
121
122 system.connect(&path, &read_file.path);
123 system.connect(&read_file.output, &output);
124
125 let thrd = std::thread::spawn(move || system.execute().and_then(|p| p.join()).unwrap());
126
127 path.send(&temp_file.path().to_string_lossy().into())
128 .unwrap();
129
130 assert_eq!(
131 output
132 .recv()
133 .expect("should receive output")
134 .expect("output shouldn't be None"),
135 test_content
136 );
137
138 path.close().unwrap();
139
140 assert_eq!(
141 output.recv(),
142 Ok(None),
143 "want EOS signal after path port is closed"
144 );
145
146 thrd.join().unwrap()
147 }
148}