protoflow_blocks/blocks/core/
count.rs1use crate::{StdioConfig, StdioError, StdioSystem, System};
4use protoflow_core::{
5 types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, Port, PortError,
6};
7use protoflow_derive::Block;
8use simple_mermaid::mermaid;
9
10#[doc = mermaid!("../../../doc/core/count.mmd")]
15#[doc = mermaid!("../../../doc/core/count.seq.mmd" framed)]
18#[derive(Block, Clone)]
47pub struct Count<T: Message = Any> {
48 #[input]
50 pub input: InputPort<T>,
51
52 #[output]
54 pub output: OutputPort<T>,
55
56 #[output]
58 pub count: OutputPort<u64>,
59
60 #[state]
62 counter: u64,
63}
64
65impl<T: Message> Count<T> {
66 pub fn new(input: InputPort<T>, output: OutputPort<T>, count: OutputPort<u64>) -> Self {
67 Self {
68 input,
69 output,
70 count,
71 counter: 0,
72 }
73 }
74}
75
76impl<T: Message + 'static> Count<T> {
77 pub fn with_system(system: &System) -> Self {
78 use crate::SystemBuilding;
79 Self::new(system.input(), system.output(), system.output())
80 }
81}
82
83impl<T: Message> Block for Count<T> {
84 fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
85 while let Some(message) = self.input.recv()? {
86 self.counter += 1;
87
88 if self.output.is_connected() {
89 self.output.send(&message)?;
90 } else {
91 drop(message);
92 }
93 }
94 self.output.close()?;
95
96 runtime.wait_for(&self.count)?;
97
98 match self.count.send(&self.counter) {
99 Ok(()) => {}
100 Err(PortError::Closed | PortError::Disconnected) => {
101 }
103 Err(e) => return Err(e)?,
104 };
105
106 Ok(())
107 }
108}
109
110#[cfg(feature = "std")]
111impl<T: Message + crate::prelude::FromStr + crate::prelude::ToString + 'static> StdioSystem
112 for Count<T>
113{
114 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
115 use crate::{CoreBlocks, IoBlocks, SystemBuilding};
116
117 config.reject_any()?;
118
119 Ok(System::build(|s| {
120 let stdin = config.read_stdin(s);
121 let message_decoder = s.decode_with::<T>(config.encoding);
122 let counter = s.count::<T>();
123 let count_encoder = s.encode_with::<u64>(config.encoding);
124 let stdout = config.write_stdout(s);
125 s.connect(&stdin.output, &message_decoder.input);
126 s.connect(&message_decoder.output, &counter.input);
127 s.connect(&counter.count, &count_encoder.input);
128 s.connect(&count_encoder.output, &stdout.input);
129 }))
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::Count;
136 use crate::{System, SystemBuilding};
137
138 #[test]
139 fn instantiate_block() {
140 let _ = System::build(|s| {
142 let _ = s.block(Count::<i32>::new(s.input(), s.output(), s.output()));
143 });
144 }
145}