extern crate std;
use crate::{
prelude::{vec, Bytes, FromStr, String, ToString, Vec},
types::Encoding,
StdioConfig, StdioError, StdioSystem, System,
};
use protoflow_core::{
Block, BlockError, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;
use std::io::BufRead;
#[doc = mermaid!("../../../doc/io/decode.mmd")]
#[doc = mermaid!("../../../doc/io/decode.seq.mmd" framed)]
#[derive(Block, Clone)]
pub struct Decode<T: Message + FromStr = String> {
#[input]
pub input: InputPort<Bytes>,
#[output]
pub output: OutputPort<T>,
#[parameter]
pub encoding: Encoding,
}
impl<T: Message + FromStr> Decode<T> {
pub fn new(input: InputPort<Bytes>, output: OutputPort<T>) -> Self {
Self::with_params(input, output, None)
}
pub fn with_params(
input: InputPort<Bytes>,
output: OutputPort<T>,
encoding: Option<Encoding>,
) -> Self {
Self {
input,
output,
encoding: encoding.unwrap_or_default(),
}
}
}
impl<T: Message + FromStr + 'static> Decode<T> {
pub fn with_system(system: &System, encoding: Option<Encoding>) -> Self {
use crate::SystemBuilding;
Self::with_params(system.input(), system.output(), encoding)
}
}
impl<T: Message + FromStr> Block for Decode<T> {
fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
let mut buffer = Vec::<u8>::new();
while let Some(chunk) = self.input.recv()? {
buffer.extend_from_slice(&chunk);
let mut cursor = std::io::Cursor::new(&buffer);
let _message = match self.encoding {
Encoding::ProtobufWithLengthPrefix => todo!(), Encoding::ProtobufWithoutLengthPrefix => todo!(), Encoding::TextWithNewlineSuffix => {
if !chunk.contains(&b'\n') {
continue; }
let mut line = String::new();
while cursor.read_line(&mut line)? > 0 {
if !line.ends_with('\n') {
cursor.set_position(cursor.position() - line.len() as u64);
break;
}
let stripped_line =
line.strip_suffix('\n').expect("line ends with newline");
match T::from_str(stripped_line) {
Ok(message) => self.output.send(&message)?,
Err(_error) => {
BlockError::Other("decode error".to_string()); }
}
line.clear();
}
}
};
buffer.drain(..cursor.position() as usize);
}
self.input.close()?;
Ok(())
}
}
#[cfg(feature = "std")]
impl StdioSystem for Decode {
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
config.allow_only(vec!["encoding"])?;
Ok(System::build(|_s| todo!()))
}
}
#[cfg(test)]
mod tests {
use super::Decode;
use crate::{System, SystemBuilding};
#[test]
fn instantiate_block() {
let _ = System::build(|s| {
let _ = s.block(Decode::<i32>::new(s.input(), s.output()));
});
}
}