protoflow_blocks/blocks/core/
count.rsuse crate::{StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{
types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, Port, PortError,
};
use protoflow_derive::Block;
use simple_mermaid::mermaid;
#[doc = mermaid!("../../../doc/core/count.mmd")]
#[doc = mermaid!("../../../doc/core/count.seq.mmd" framed)]
#[derive(Block, Clone)]
pub struct Count<T: Message = Any> {
#[input]
pub input: InputPort<T>,
#[output]
pub output: OutputPort<T>,
#[output]
pub count: OutputPort<u64>,
#[state]
counter: u64,
}
impl<T: Message> Count<T> {
pub fn new(input: InputPort<T>, output: OutputPort<T>, count: OutputPort<u64>) -> Self {
Self {
input,
output,
count,
counter: 0,
}
}
}
impl<T: Message + 'static> Count<T> {
pub fn with_system(system: &System) -> Self {
use crate::SystemBuilding;
Self::new(system.input(), system.output(), system.output())
}
}
impl<T: Message> Block for Count<T> {
fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
while let Some(message) = self.input.recv()? {
self.counter += 1;
if self.output.is_connected() {
self.output.send(&message)?;
} else {
drop(message);
}
}
self.output.close()?;
runtime.wait_for(&self.count)?;
match self.count.send(&self.counter) {
Ok(()) => {}
Err(PortError::Closed | PortError::Disconnected) => {
}
Err(e) => return Err(e)?,
};
Ok(())
}
}
#[cfg(feature = "std")]
impl<T: Message + crate::prelude::FromStr + crate::prelude::ToString + 'static> StdioSystem
for Count<T>
{
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
use crate::{CoreBlocks, IoBlocks, SystemBuilding};
config.reject_any()?;
Ok(System::build(|s| {
let stdin = config.read_stdin(s);
let message_decoder = s.decode_with::<T>(config.encoding);
let counter = s.count::<T>();
let count_encoder = s.encode_with::<u64>(config.encoding);
let stdout = config.write_stdout(s);
s.connect(&stdin.output, &message_decoder.input);
s.connect(&message_decoder.output, &counter.input);
s.connect(&counter.count, &count_encoder.input);
s.connect(&count_encoder.output, &stdout.input);
}))
}
}
#[cfg(test)]
mod tests {
use super::Count;
use crate::{System, SystemBuilding};
#[test]
fn instantiate_block() {
let _ = System::build(|s| {
let _ = s.block(Count::<i32>::new(s.input(), s.output(), s.output()));
});
}
}