1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// This is free and unencumbered software released into the public domain.

extern crate std;

use crate::{Encoding, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{
    prelude::{Bytes, String, ToString},
    Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;

/// A block that encodes `T` messages to a byte stream.
///
/// # Block Diagram
#[doc = mermaid!("../../doc/io/encode.mmd")]
///
/// # Sequence Diagram
#[doc = mermaid!("../../doc/io/encode.seq.mmd" framed)]
///
/// # Examples
///
/// ## Using the block in a system
///
/// ```rust
/// # use protoflow_blocks::*;
/// # fn main() {
/// System::build(|s| {
///     let stdin = s.read_stdin();
///     let message_decoder = s.decode_lines();
///     let counter = s.count::<String>();
///     let count_encoder = s.encode_lines();
///     let stdout = s.write_stdout();
///     s.connect(&stdin.output, &message_decoder.input);
///     s.connect(&message_decoder.output, &counter.input);
///     s.connect(&counter.count, &count_encoder.input);
///     s.connect(&count_encoder.output, &stdout.input);
/// });
/// # }
/// ```
///
/// ## Running the block via the CLI
///
/// ```console
/// $ protoflow execute Encode encoding=text
/// ```
///
/// ```console
/// $ protoflow execute Encode encoding=protobuf
/// ```
///
#[derive(Block, Clone)]
pub struct Encode<T: Message + ToString = String> {
    /// The input message stream.
    #[input]
    pub input: InputPort<T>,

    /// The output byte stream.
    #[output]
    pub output: OutputPort<Bytes>,

    /// A configuration parameter for how to encode messages.
    #[parameter]
    pub encoding: Encoding,
}

impl<T: Message + ToString> Encode<T> {
    pub fn new(input: InputPort<T>, output: OutputPort<Bytes>) -> Self {
        Self::with_params(input, output, Encoding::default())
    }

    pub fn with_params(input: InputPort<T>, output: OutputPort<Bytes>, encoding: Encoding) -> Self {
        Self {
            input,
            output,
            encoding,
        }
    }
}

impl<T: Message + ToString> Block for Encode<T> {
    fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
        runtime.wait_for(&self.input)?;

        while let Some(message) = self.input.recv()? {
            use Encoding::*;
            let bytes = match self.encoding {
                ProtobufWithLengthPrefix => Bytes::from(message.encode_length_delimited_to_vec()),
                ProtobufWithoutLengthPrefix => Bytes::from(message.encode_to_vec()),
                TextWithNewlineSuffix => {
                    let mut string = message.to_string();
                    string.push('\n');
                    Bytes::from(string)
                }
            };
            self.output.send(&bytes)?;
        }

        Ok(())
    }
}

#[cfg(feature = "std")]
impl StdioSystem for Encode {
    fn build_system(_config: StdioConfig) -> Result<System, StdioError> {
        //use crate::{CoreBlocks, SysBlocks, SystemBuilding};

        Ok(System::build(|_s| todo!()))
    }
}

#[cfg(test)]
mod tests {
    use super::Encode;
    use crate::{System, SystemBuilding};

    #[test]
    fn instantiate_block() {
        // Check that the block is constructible:
        let _ = System::build(|s| {
            let _ = s.block(Encode::<i32>::new(s.input(), s.output()));
        });
    }
}