protoflow_blocks/blocks/sys/
read_stdin.rs1extern crate std;
4
5use crate::{
6 prelude::{vec, Bytes},
7 types::ByteSize,
8 StdioConfig, StdioError, StdioSystem, System,
9};
10use protoflow_core::{Block, BlockResult, BlockRuntime, OutputPort};
11use protoflow_derive::Block;
12use simple_mermaid::mermaid;
13use std::io::Read;
14
15const DEFAULT_BUFFER_SIZE: ByteSize = ByteSize::new(1024);
17
18#[doc = mermaid!("../../../doc/sys/read_stdin.mmd")]
22#[doc = mermaid!("../../../doc/sys/read_stdin.seq.mmd" framed)]
25#[derive(Block, Clone)]
52pub struct ReadStdin {
53 #[output]
55 pub output: OutputPort<Bytes>,
56
57 #[parameter]
59 pub buffer_size: ByteSize,
60}
61
62impl ReadStdin {
63 pub fn new(output: OutputPort<Bytes>) -> Self {
64 Self::with_params(output, None)
65 }
66
67 pub fn with_params(output: OutputPort<Bytes>, buffer_size: Option<ByteSize>) -> Self {
68 Self {
69 output,
70 buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
71 }
72 }
73
74 pub fn with_system(system: &System, buffer_size: Option<ByteSize>) -> Self {
75 use crate::SystemBuilding;
76 Self::with_params(system.output(), buffer_size)
77 }
78}
79
80impl Block for ReadStdin {
81 fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
82 let stdin = std::io::stdin().lock();
83 let mut reader = std::io::BufReader::new(stdin);
84 let mut buffer = vec![0; self.buffer_size.into()];
85
86 runtime.wait_for(&self.output)?;
87
88 loop {
89 buffer.resize(self.buffer_size.into(), b'\0'); buffer.fill(b'\0');
91
92 match reader.read(&mut buffer) {
93 Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
94 Err(err) => return Err(err.into()),
95 Ok(0) => break, Ok(buffer_len) => {
97 buffer.resize(buffer_len, b'\0'); let bytes = Bytes::from(buffer.clone());
99 self.output.send(&bytes)?;
100 }
101 }
102 }
103
104 Ok(())
105 }
106}
107
108#[cfg(feature = "std")]
109impl StdioSystem for ReadStdin {
110 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
111 use crate::SystemBuilding;
112
113 config.allow_only(vec!["buffer_size"])?;
114
115 Ok(System::build(|s| {
116 let stdin = config.read_stdin(s);
117 let stdout = config.write_stdout(s);
118 s.connect(&stdin.output, &stdout.input);
119 }))
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::ReadStdin;
126 use crate::{System, SystemBuilding};
127
128 #[test]
129 fn instantiate_block() {
130 let _ = System::build(|s| {
132 let _ = s.block(ReadStdin::new(s.output()));
133 });
134 }
135}