protoflow_blocks/blocks/core/
buffer.rs1use crate::{prelude::VecDeque, StdioConfig, StdioError, StdioSystem, System};
4use protoflow_core::{types::Any, Block, BlockResult, BlockRuntime, InputPort, Message};
5use protoflow_derive::Block;
6use simple_mermaid::mermaid;
7
8#[doc = mermaid!("../../../doc/core/buffer.mmd")]
12#[doc = mermaid!("../../../doc/core/buffer.seq.mmd" framed)]
15#[derive(Block, Clone)]
38pub struct Buffer<T: Message = Any> {
39 #[input]
41 pub input: InputPort<T>,
42
43 #[state]
45 messages: VecDeque<T>,
46}
47
48impl<T: Message> Buffer<T> {
49 pub fn new(input: InputPort<T>) -> Self {
50 Self {
51 input,
52 messages: VecDeque::new(),
53 }
54 }
55
56 pub fn messages(&self) -> &VecDeque<T> {
57 &self.messages
58 }
59}
60
61impl<T: Message + 'static> Buffer<T> {
62 pub fn with_system(system: &System) -> Self {
63 use crate::SystemBuilding;
64 Self::new(system.input())
65 }
66}
67
68impl<T: Message> Block for Buffer<T> {
69 fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
70 while let Some(message) = self.input.recv()? {
71 self.messages.push_back(message);
72 }
73 Ok(())
74 }
75}
76
77#[cfg(feature = "std")]
78impl<T: Message> StdioSystem for Buffer<T> {
79 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
80 use crate::{CoreBlocks, SystemBuilding};
81
82 config.reject_any()?;
83
84 Ok(System::build(|s| {
85 let stdin = config.read_stdin(s);
86 let buffer = s.buffer();
87 s.connect(&stdin.output, &buffer.input);
88 }))
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::Buffer;
95 use crate::{System, SystemBuilding};
96
97 #[test]
98 fn instantiate_block() {
99 let _ = System::build(|s| {
101 let _ = s.block(Buffer::<i32>::new(s.input()));
102 });
103 }
104}