protoflow_blocks/blocks/io/
encode.rs

1// This is free and unencumbered software released into the public domain.
2
3extern crate std;
4
5use crate::{
6    prelude::{vec, Bytes, String, ToString},
7    types::Encoding,
8    StdioConfig, StdioError, StdioSystem, System,
9};
10use protoflow_core::{Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort};
11use protoflow_derive::Block;
12use simple_mermaid::mermaid;
13
14/// A block that encodes `T` messages to a byte stream.
15///
16/// # Block Diagram
17#[doc = mermaid!("../../../doc/io/encode.mmd")]
18///
19/// # Sequence Diagram
20#[doc = mermaid!("../../../doc/io/encode.seq.mmd" framed)]
21///
22/// # Examples
23///
24/// ## Using the block in a system
25///
26/// ```rust
27/// # use protoflow_blocks::*;
28/// # fn main() {
29/// System::build(|s| {
30///     let stdin = s.read_stdin();
31///     let message_decoder = s.decode_lines();
32///     let counter = s.count::<String>();
33///     let count_encoder = s.encode_lines();
34///     let stdout = s.write_stdout();
35///     s.connect(&stdin.output, &message_decoder.input);
36///     s.connect(&message_decoder.output, &counter.input);
37///     s.connect(&counter.count, &count_encoder.input);
38///     s.connect(&count_encoder.output, &stdout.input);
39/// });
40/// # }
41/// ```
42///
43/// ## Running the block via the CLI
44///
45/// ```console
46/// $ protoflow execute Encode encoding=text
47/// ```
48///
49/// ```console
50/// $ protoflow execute Encode encoding=protobuf
51/// ```
52///
53#[derive(Block, Clone)]
54pub struct Encode<T: Message + ToString = String> {
55    /// The input message stream.
56    #[input]
57    pub input: InputPort<T>,
58
59    /// The output byte stream.
60    #[output]
61    pub output: OutputPort<Bytes>,
62
63    /// A configuration parameter for how to encode messages.
64    #[parameter]
65    pub encoding: Encoding,
66}
67
68impl<T: Message + ToString> Encode<T> {
69    pub fn new(input: InputPort<T>, output: OutputPort<Bytes>) -> Self {
70        Self::with_params(input, output, None)
71    }
72
73    pub fn with_params(
74        input: InputPort<T>,
75        output: OutputPort<Bytes>,
76        encoding: Option<Encoding>,
77    ) -> Self {
78        Self {
79            input,
80            output,
81            encoding: encoding.unwrap_or_default(),
82        }
83    }
84}
85
86impl<T: Message + ToString + 'static> Encode<T> {
87    pub fn with_system(system: &System, encoding: Option<Encoding>) -> Self {
88        use crate::SystemBuilding;
89        Self::with_params(system.input(), system.output(), encoding)
90    }
91}
92
93impl<T: Message + ToString> Block for Encode<T> {
94    fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
95        runtime.wait_for(&self.input)?;
96
97        while let Some(message) = self.input.recv()? {
98            use Encoding::*;
99            let bytes = match self.encoding {
100                ProtobufWithLengthPrefix => Bytes::from(message.encode_length_delimited_to_vec()),
101                ProtobufWithoutLengthPrefix => Bytes::from(message.encode_to_vec()),
102                TextWithNewlineSuffix => {
103                    let mut string = message.to_string();
104                    string.push('\n');
105                    Bytes::from(string)
106                }
107            };
108            self.output.send(&bytes)?;
109        }
110
111        Ok(())
112    }
113}
114
115#[cfg(feature = "std")]
116impl StdioSystem for Encode {
117    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
118        //use crate::{CoreBlocks, SysBlocks, SystemBuilding};
119
120        config.allow_only(vec!["encoding"])?;
121
122        Ok(System::build(|_s| todo!()))
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::Encode;
129    use crate::{System, SystemBuilding};
130
131    #[test]
132    fn instantiate_block() {
133        // Check that the block is constructible:
134        let _ = System::build(|s| {
135            let _ = s.block(Encode::<i32>::new(s.input(), s.output()));
136        });
137    }
138}