protoflow_blocks/blocks/sys/
read_stdin.rs

1// This is free and unencumbered software released into the public domain.
2
3extern crate std;
4
5use crate::{
6    prelude::{vec, Bytes},
7    types::ByteSize,
8    StdioConfig, StdioError, StdioSystem, System,
9};
10use protoflow_core::{Block, BlockResult, BlockRuntime, OutputPort};
11use protoflow_derive::Block;
12use simple_mermaid::mermaid;
13use std::io::Read;
14
15/// The default buffer size for reading from standard input.
16const DEFAULT_BUFFER_SIZE: ByteSize = ByteSize::new(1024);
17
18/// A block that reads bytes from standard input (aka stdin).
19///
20/// # Block Diagram
21#[doc = mermaid!("../../../doc/sys/read_stdin.mmd")]
22///
23/// # Sequence Diagram
24#[doc = mermaid!("../../../doc/sys/read_stdin.seq.mmd" framed)]
25///
26/// # Examples
27///
28/// ## Using the block in a system
29///
30/// ```rust
31/// # use protoflow_blocks::*;
32/// # fn main() {
33/// System::build(|s| {
34///     let stdin = s.read_stdin();
35///     let stdout = s.write_stdout();
36///     s.connect(&stdin.output, &stdout.input);
37/// });
38/// # }
39/// ```
40///
41/// ## Running the block via the CLI
42///
43/// ```console
44/// $ protoflow execute ReadStdin < input.txt
45/// ```
46///
47/// ```console
48/// $ protoflow execute ReadStdin buffer-size=1024 < input.txt
49/// ```
50///
51#[derive(Block, Clone)]
52pub struct ReadStdin {
53    /// The output message stream.
54    #[output]
55    pub output: OutputPort<Bytes>,
56
57    /// The maximum number of bytes to read at a time.
58    #[parameter]
59    pub buffer_size: ByteSize,
60}
61
62impl ReadStdin {
63    pub fn new(output: OutputPort<Bytes>) -> Self {
64        Self::with_params(output, None)
65    }
66
67    pub fn with_params(output: OutputPort<Bytes>, buffer_size: Option<ByteSize>) -> Self {
68        Self {
69            output,
70            buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
71        }
72    }
73
74    pub fn with_system(system: &System, buffer_size: Option<ByteSize>) -> Self {
75        use crate::SystemBuilding;
76        Self::with_params(system.output(), buffer_size)
77    }
78}
79
80impl Block for ReadStdin {
81    fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
82        let stdin = std::io::stdin().lock();
83        let mut reader = std::io::BufReader::new(stdin);
84        let mut buffer = vec![0; self.buffer_size.into()];
85
86        runtime.wait_for(&self.output)?;
87
88        loop {
89            buffer.resize(self.buffer_size.into(), b'\0'); // reinitialize the buffer
90            buffer.fill(b'\0');
91
92            match reader.read(&mut buffer) {
93                Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
94                Err(err) => return Err(err.into()),
95                Ok(0) => break, // EOF
96                Ok(buffer_len) => {
97                    buffer.resize(buffer_len, b'\0'); // truncate the buffer
98                    let bytes = Bytes::from(buffer.clone());
99                    self.output.send(&bytes)?;
100                }
101            }
102        }
103
104        Ok(())
105    }
106}
107
108#[cfg(feature = "std")]
109impl StdioSystem for ReadStdin {
110    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
111        use crate::SystemBuilding;
112
113        config.allow_only(vec!["buffer_size"])?;
114
115        Ok(System::build(|s| {
116            let stdin = config.read_stdin(s);
117            let stdout = config.write_stdout(s);
118            s.connect(&stdin.output, &stdout.input);
119        }))
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::ReadStdin;
126    use crate::{System, SystemBuilding};
127
128    #[test]
129    fn instantiate_block() {
130        // Check that the block is constructible:
131        let _ = System::build(|s| {
132            let _ = s.block(ReadStdin::new(s.output()));
133        });
134    }
135}