protoflow_blocks/blocks/io/
encode_json.rs

1// This is free and unencumbered software released into the public domain.
2
3extern crate std;
4
5use crate::{
6    prelude::{Bytes, Vec},
7    StdioConfig, StdioError, StdioSystem, System,
8};
9use protoflow_core::{types::Value, Block, BlockResult, BlockRuntime, InputPort, OutputPort};
10use protoflow_derive::Block;
11use simple_mermaid::mermaid;
12use struson::writer::*;
13
14/// A block that encodes messages into JSON format.
15///
16/// # Block Diagram
17#[doc = mermaid!("../../../doc/io/encode_hex.mmd")]
18///
19/// # Sequence Diagram
20#[doc = mermaid!("../../../doc/io/encode_hex.seq.mmd" framed)]
21///
22/// # Examples
23///
24/// ## Using the block in a system
25///
26/// ```rust
27/// # use protoflow_blocks::*;
28/// # fn main() {
29/// System::build(|s| {
30///     // TODO
31/// });
32/// # }
33/// ```
34///
35/// ## Running the block via the CLI
36///
37/// ```console
38/// $ protoflow execute EncodeJSON
39/// ```
40///
41#[derive(Block, Clone)]
42pub struct EncodeJson {
43    /// The input message stream.
44    #[input]
45    pub input: InputPort<Value>,
46
47    /// The output byte stream.
48    #[output]
49    pub output: OutputPort<Bytes>,
50}
51
52impl EncodeJson {
53    pub fn new(input: InputPort<Value>, output: OutputPort<Bytes>) -> Self {
54        Self { input, output }
55    }
56
57    pub fn with_system(system: &System) -> Self {
58        use crate::SystemBuilding;
59        Self::new(system.input(), system.output())
60    }
61}
62
63impl Block for EncodeJson {
64    fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
65        runtime.wait_for(&self.input)?;
66
67        while let Some(input) = self.input.recv()? {
68            let mut buffer = Vec::<u8>::new();
69            {
70                let mut json = JsonStreamWriter::new(&mut buffer);
71                encode(&mut json, input)?;
72                json.finish_document()?;
73            }
74            let output = Bytes::from(buffer);
75            self.output.send(&output)?;
76        }
77
78        Ok(())
79    }
80}
81
82fn encode(json: &mut JsonStreamWriter<&mut Vec<u8>>, value: Value) -> Result<(), std::io::Error> {
83    use protoflow_core::types::value::Kind::*;
84    match value.kind.unwrap() {
85        NullValue(_) => json.null_value()?,
86        BoolValue(bool) => json.bool_value(bool)?,
87        NumberValue(number) => {
88            if number.is_nan() {
89                json.string_value("NaN")?
90            } else if number.is_infinite() && number.is_sign_negative() {
91                json.string_value("-Infinity")?
92            } else if number.is_infinite() && number.is_sign_positive() {
93                json.string_value("Infinity")?
94            } else {
95                json.fp_number_value(number).map_err(|_| {
96                    std::io::Error::new(
97                        std::io::ErrorKind::InvalidData,
98                        "failed to encode JSON number",
99                    )
100                })?;
101            }
102        }
103        StringValue(string) => json.string_value(&string)?,
104        ListValue(array) => {
105            json.begin_array()?;
106            for value in array.values {
107                encode(json, value)?;
108            }
109            json.end_array()?
110        }
111        StructValue(object) => {
112            json.begin_object()?;
113            for (key, value) in object.fields {
114                json.name(&key)?;
115                encode(json, value)?;
116            }
117            json.end_object()?
118        }
119    }
120    Ok(())
121}
122
123#[cfg(feature = "std")]
124impl StdioSystem for EncodeJson {
125    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
126        //use crate::SystemBuilding;
127
128        config.reject_any()?;
129
130        Ok(System::build(|_s| {
131            //let stdin = config.read_stdin(s);
132            //let json_encoder = s.encode_json();
133            //let stdout = config.write_stdout(s);
134            //s.connect(&stdin.output, &json_encoder.input);
135            //s.connect(&json_encoder.output, &stdout.input);
136            todo!() // TODO
137        }))
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::EncodeJson;
144    use crate::{System, SystemBuilding};
145
146    #[test]
147    fn instantiate_block() {
148        // Check that the block is constructible:
149        let _ = System::build(|s| {
150            let _ = s.block(EncodeJson::new(s.input(), s.output()));
151        });
152    }
153}