protoflow_blocks/blocks/io/
decode.rs

1// This is free and unencumbered software released into the public domain.
2
3extern crate std;
4
5use crate::{
6    prelude::{vec, Bytes, FromStr, String, ToString, Vec},
7    types::Encoding,
8    StdioConfig, StdioError, StdioSystem, System,
9};
10use protoflow_core::{
11    Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
12};
13use protoflow_derive::Block;
14use simple_mermaid::mermaid;
15use std::io::BufRead;
16
17/// A block that decodes `T` messages from a byte stream.
18///
19/// # Block Diagram
20#[doc = mermaid!("../../../doc/io/decode.mmd")]
21///
22/// # Sequence Diagram
23#[doc = mermaid!("../../../doc/io/decode.seq.mmd" framed)]
24///
25/// # Examples
26///
27/// ## Using the block in a system
28///
29/// ```rust
30/// # use protoflow_blocks::*;
31/// # fn main() {
32/// System::build(|s| {
33///     let stdin = s.read_stdin();
34///     let message_decoder = s.decode_lines();
35///     let counter = s.count::<String>();
36///     let count_encoder = s.encode_lines();
37///     let stdout = s.write_stdout();
38///     s.connect(&stdin.output, &message_decoder.input);
39///     s.connect(&message_decoder.output, &counter.input);
40///     s.connect(&counter.count, &count_encoder.input);
41///     s.connect(&count_encoder.output, &stdout.input);
42/// });
43/// # }
44/// ```
45///
46/// ## Running the block via the CLI
47///
48/// ```console
49/// $ protoflow execute Decode encoding=text
50/// ```
51///
52/// ```console
53/// $ protoflow execute Decode encoding=protobuf
54/// ```
55///
56#[derive(Block, Clone)]
57pub struct Decode<T: Message + FromStr = String> {
58    /// The input byte stream.
59    #[input]
60    pub input: InputPort<Bytes>,
61
62    /// The output message stream.
63    #[output]
64    pub output: OutputPort<T>,
65
66    /// A configuration parameter for how to decode messages.
67    #[parameter]
68    pub encoding: Encoding,
69}
70
71impl<T: Message + FromStr> Decode<T> {
72    pub fn new(input: InputPort<Bytes>, output: OutputPort<T>) -> Self {
73        Self::with_params(input, output, None)
74    }
75
76    pub fn with_params(
77        input: InputPort<Bytes>,
78        output: OutputPort<T>,
79        encoding: Option<Encoding>,
80    ) -> Self {
81        Self {
82            input,
83            output,
84            encoding: encoding.unwrap_or_default(),
85        }
86    }
87}
88
89impl<T: Message + FromStr + 'static> Decode<T> {
90    pub fn with_system(system: &System, encoding: Option<Encoding>) -> Self {
91        use crate::SystemBuilding;
92        Self::with_params(system.input(), system.output(), encoding)
93    }
94}
95
96impl<T: Message + FromStr> Block for Decode<T> {
97    fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
98        let mut buffer = Vec::<u8>::new();
99
100        while let Some(chunk) = self.input.recv()? {
101            buffer.extend_from_slice(&chunk);
102
103            let mut cursor = std::io::Cursor::new(&buffer);
104
105            let _message = match self.encoding {
106                Encoding::ProtobufWithLengthPrefix => todo!(), // TODO
107                Encoding::ProtobufWithoutLengthPrefix => todo!(), // TODO
108                Encoding::TextWithNewlineSuffix => {
109                    if !chunk.contains(&b'\n') {
110                        continue; // skip useless chunks
111                    }
112                    let mut line = String::new();
113                    while cursor.read_line(&mut line)? > 0 {
114                        if !line.ends_with('\n') {
115                            cursor.set_position(cursor.position() - line.len() as u64);
116                            break;
117                        }
118                        let stripped_line =
119                            line.strip_suffix('\n').expect("line ends with newline");
120                        match T::from_str(stripped_line) {
121                            Ok(message) => self.output.send(&message)?,
122                            Err(_error) => {
123                                BlockError::Other("decode error".to_string()); // FIXME
124                            }
125                        }
126                        line.clear();
127                    }
128                }
129            };
130
131            buffer.drain(..cursor.position() as usize);
132        }
133
134        self.input.close()?;
135        Ok(())
136    }
137}
138
139#[cfg(feature = "std")]
140impl StdioSystem for Decode {
141    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
142        //use crate::{CoreBlocks, SysBlocks, SystemBuilding};
143
144        config.allow_only(vec!["encoding"])?;
145
146        Ok(System::build(|_s| todo!()))
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::Decode;
153    use crate::{System, SystemBuilding};
154
155    #[test]
156    fn instantiate_block() {
157        // Check that the block is constructible:
158        let _ = System::build(|s| {
159            let _ = s.block(Decode::<i32>::new(s.input(), s.output()));
160        });
161    }
162}