protoflow_blocks/blocks/io/
decode.rs1extern crate std;
4
5use crate::{
6 prelude::{vec, Bytes, FromStr, String, ToString, Vec},
7 types::Encoding,
8 StdioConfig, StdioError, StdioSystem, System,
9};
10use protoflow_core::{
11 Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
12};
13use protoflow_derive::Block;
14use simple_mermaid::mermaid;
15use std::io::BufRead;
16
17#[doc = mermaid!("../../../doc/io/decode.mmd")]
21#[doc = mermaid!("../../../doc/io/decode.seq.mmd" framed)]
24#[derive(Block, Clone)]
57pub struct Decode<T: Message + FromStr = String> {
58 #[input]
60 pub input: InputPort<Bytes>,
61
62 #[output]
64 pub output: OutputPort<T>,
65
66 #[parameter]
68 pub encoding: Encoding,
69}
70
71impl<T: Message + FromStr> Decode<T> {
72 pub fn new(input: InputPort<Bytes>, output: OutputPort<T>) -> Self {
73 Self::with_params(input, output, None)
74 }
75
76 pub fn with_params(
77 input: InputPort<Bytes>,
78 output: OutputPort<T>,
79 encoding: Option<Encoding>,
80 ) -> Self {
81 Self {
82 input,
83 output,
84 encoding: encoding.unwrap_or_default(),
85 }
86 }
87}
88
89impl<T: Message + FromStr + 'static> Decode<T> {
90 pub fn with_system(system: &System, encoding: Option<Encoding>) -> Self {
91 use crate::SystemBuilding;
92 Self::with_params(system.input(), system.output(), encoding)
93 }
94}
95
96impl<T: Message + FromStr> Block for Decode<T> {
97 fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
98 let mut buffer = Vec::<u8>::new();
99
100 while let Some(chunk) = self.input.recv()? {
101 buffer.extend_from_slice(&chunk);
102
103 let mut cursor = std::io::Cursor::new(&buffer);
104
105 let _message = match self.encoding {
106 Encoding::ProtobufWithLengthPrefix => todo!(), Encoding::ProtobufWithoutLengthPrefix => todo!(), Encoding::TextWithNewlineSuffix => {
109 if !chunk.contains(&b'\n') {
110 continue; }
112 let mut line = String::new();
113 while cursor.read_line(&mut line)? > 0 {
114 if !line.ends_with('\n') {
115 cursor.set_position(cursor.position() - line.len() as u64);
116 break;
117 }
118 let stripped_line =
119 line.strip_suffix('\n').expect("line ends with newline");
120 match T::from_str(stripped_line) {
121 Ok(message) => self.output.send(&message)?,
122 Err(_error) => {
123 BlockError::Other("decode error".to_string()); }
125 }
126 line.clear();
127 }
128 }
129 };
130
131 buffer.drain(..cursor.position() as usize);
132 }
133
134 self.input.close()?;
135 Ok(())
136 }
137}
138
139#[cfg(feature = "std")]
140impl StdioSystem for Decode {
141 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
142 config.allow_only(vec!["encoding"])?;
145
146 Ok(System::build(|_s| todo!()))
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::Decode;
153 use crate::{System, SystemBuilding};
154
155 #[test]
156 fn instantiate_block() {
157 let _ = System::build(|s| {
159 let _ = s.block(Decode::<i32>::new(s.input(), s.output()));
160 });
161 }
162}