protoflow_blocks/blocks/core/
buffer.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{prelude::VecDeque, StdioConfig, StdioError, StdioSystem, System};
4use protoflow_core::{types::Any, Block, BlockResult, BlockRuntime, InputPort, Message};
5use protoflow_derive::Block;
6use simple_mermaid::mermaid;
7
8/// A block that simply stores all messages it receives.
9///
10/// # Block Diagram
11#[doc = mermaid!("../../../doc/core/buffer.mmd")]
12///
13/// # Sequence Diagram
14#[doc = mermaid!("../../../doc/core/buffer.seq.mmd" framed)]
15///
16/// # Examples
17///
18/// ## Using the block in a system
19///
20/// ```rust
21/// # use protoflow_blocks::*;
22/// # fn main() {
23/// System::build(|s| {
24///     let stdin = s.read_stdin();
25///     let buffer = s.buffer();
26///     s.connect(&stdin.output, &buffer.input);
27/// });
28/// # }
29/// ```
30///
31/// ## Running the block via the CLI
32///
33/// ```console
34/// $ protoflow execute Buffer
35/// ```
36///
37#[derive(Block, Clone)]
38pub struct Buffer<T: Message = Any> {
39    /// The input message stream.
40    #[input]
41    pub input: InputPort<T>,
42
43    /// The internal state storing the messages received.
44    #[state]
45    messages: VecDeque<T>,
46}
47
48impl<T: Message> Buffer<T> {
49    pub fn new(input: InputPort<T>) -> Self {
50        Self {
51            input,
52            messages: VecDeque::new(),
53        }
54    }
55
56    pub fn messages(&self) -> &VecDeque<T> {
57        &self.messages
58    }
59}
60
61impl<T: Message + 'static> Buffer<T> {
62    pub fn with_system(system: &System) -> Self {
63        use crate::SystemBuilding;
64        Self::new(system.input())
65    }
66}
67
68impl<T: Message> Block for Buffer<T> {
69    fn execute(&mut self, _runtime: &dyn BlockRuntime) -> BlockResult {
70        while let Some(message) = self.input.recv()? {
71            self.messages.push_back(message);
72        }
73        Ok(())
74    }
75}
76
77#[cfg(feature = "std")]
78impl<T: Message> StdioSystem for Buffer<T> {
79    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
80        use crate::{CoreBlocks, SystemBuilding};
81
82        config.reject_any()?;
83
84        Ok(System::build(|s| {
85            let stdin = config.read_stdin(s);
86            let buffer = s.buffer();
87            s.connect(&stdin.output, &buffer.input);
88        }))
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::Buffer;
95    use crate::{System, SystemBuilding};
96
97    #[test]
98    fn instantiate_block() {
99        // Check that the block is constructible:
100        let _ = System::build(|s| {
101            let _ = s.block(Buffer::<i32>::new(s.input()));
102        });
103    }
104}