1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
// This is free and unencumbered software released into the public domain.
extern crate std;
use crate::{Encoding, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{
prelude::{Bytes, String, ToString},
Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;
/// A block that encodes `T` messages to a byte stream.
///
/// # Block Diagram
#[doc = mermaid!("../../doc/io/encode.mmd")]
///
/// # Sequence Diagram
#[doc = mermaid!("../../doc/io/encode.seq.mmd" framed)]
///
/// # Examples
///
/// ## Using the block in a system
///
/// ```rust
/// # use protoflow_blocks::*;
/// # fn main() {
/// System::build(|s| {
/// let stdin = s.read_stdin();
/// let message_decoder = s.decode_lines();
/// let counter = s.count::<String>();
/// let count_encoder = s.encode_lines();
/// let stdout = s.write_stdout();
/// s.connect(&stdin.output, &message_decoder.input);
/// s.connect(&message_decoder.output, &counter.input);
/// s.connect(&counter.count, &count_encoder.input);
/// s.connect(&count_encoder.output, &stdout.input);
/// });
/// # }
/// ```
///
/// ## Running the block via the CLI
///
/// ```console
/// $ protoflow execute Encode encoding=text
/// ```
///
/// ```console
/// $ protoflow execute Encode encoding=protobuf
/// ```
///
#[derive(Block, Clone)]
pub struct Encode<T: Message + ToString = String> {
/// The input message stream.
#[input]
pub input: InputPort<T>,
/// The output byte stream.
#[output]
pub output: OutputPort<Bytes>,
/// A configuration parameter for how to encode messages.
#[parameter]
pub encoding: Encoding,
}
impl<T: Message + ToString> Encode<T> {
pub fn new(input: InputPort<T>, output: OutputPort<Bytes>) -> Self {
Self::with_params(input, output, Encoding::default())
}
pub fn with_params(input: InputPort<T>, output: OutputPort<Bytes>, encoding: Encoding) -> Self {
Self {
input,
output,
encoding,
}
}
}
impl<T: Message + ToString> Block for Encode<T> {
fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
runtime.wait_for(&self.input)?;
while let Some(message) = self.input.recv()? {
use Encoding::*;
let bytes = match self.encoding {
ProtobufWithLengthPrefix => Bytes::from(message.encode_length_delimited_to_vec()),
ProtobufWithoutLengthPrefix => Bytes::from(message.encode_to_vec()),
TextWithNewlineSuffix => {
let mut string = message.to_string();
string.push('\n');
Bytes::from(string)
}
};
self.output.send(&bytes)?;
}
Ok(())
}
}
#[cfg(feature = "std")]
impl StdioSystem for Encode {
fn build_system(_config: StdioConfig) -> Result<System, StdioError> {
//use crate::{CoreBlocks, SysBlocks, SystemBuilding};
Ok(System::build(|_s| todo!()))
}
}
#[cfg(test)]
mod tests {
use super::Encode;
use crate::{System, SystemBuilding};
#[test]
fn instantiate_block() {
// Check that the block is constructible:
let _ = System::build(|s| {
let _ = s.block(Encode::<i32>::new(s.input(), s.output()));
});
}
}