protoflow_blocks/blocks/core/
delay.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{
4    prelude::{vec, Duration},
5    types::DelayType,
6    StdioConfig, StdioError, StdioSystem, System,
7};
8use protoflow_core::{
9    types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, Port,
10};
11use protoflow_derive::Block;
12use simple_mermaid::mermaid;
13
14/// A block that passes messages through while delaying them by a fixed or
15/// random duration.
16///
17/// # Block Diagram
18#[doc = mermaid!("../../../doc/core/delay.mmd")]
19///
20/// # Sequence Diagram
21#[doc = mermaid!("../../../doc/core/delay.seq.mmd" framed)]
22///
23/// # Examples
24///
25/// ## Using the block in a system
26///
27/// ```rust
28/// # use protoflow_blocks::*;
29/// # use std::time::Duration;
30/// # fn main() {
31/// System::build(|s| {
32///     let stdin = s.read_stdin();
33///     let line_decoder = s.decode_lines();
34///     let delay = Duration::from_secs(1);
35///     let delayer = s.delay_by_fixed::<String>(delay);
36///     let line_encoder = s.encode_lines();
37///     let stdout = s.write_stdout();
38///     s.connect(&stdin.output, &line_decoder.input);
39///     s.connect(&line_decoder.output, &delayer.input);
40///     s.connect(&delayer.output, &line_encoder.input);
41///     s.connect(&line_encoder.output, &stdout.input);
42/// });
43/// # }
44/// ```
45///
46/// ## Running the block via the CLI
47///
48/// ```console
49/// $ protoflow execute Delay fixed=2
50/// ```
51///
52/// ```console
53/// $ protoflow execute Delay random=1..5
54/// ```
55///
56#[derive(Block, Clone)]
57pub struct Delay<T: Message = Any> {
58    /// The input message stream.
59    #[input]
60    pub input: InputPort<T>,
61
62    /// The output target for the stream being passed through.
63    #[output]
64    pub output: OutputPort<T>,
65
66    /// A configuration parameter for which type of delay to add.
67    #[parameter]
68    pub delay: DelayType,
69}
70
71impl<T: Message> Delay<T> {
72    pub fn new(input: InputPort<T>, output: OutputPort<T>) -> Self {
73        Self::with_params(input, output, None)
74    }
75
76    pub fn with_params(
77        input: InputPort<T>,
78        output: OutputPort<T>,
79        delay: Option<DelayType>,
80    ) -> Self {
81        Self {
82            input,
83            output,
84            delay: delay.unwrap_or_default(),
85        }
86    }
87}
88
89impl<T: Message + 'static> Delay<T> {
90    pub fn with_system(system: &System, delay: Option<DelayType>) -> Self {
91        use crate::SystemBuilding;
92        Self::with_params(system.input(), system.output(), delay)
93    }
94}
95
96impl<T: Message> Block for Delay<T> {
97    fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
98        while let Some(message) = self.input.recv()? {
99            if !self.output.is_connected() {
100                drop(message);
101                continue;
102            }
103
104            let duration = match self.delay {
105                DelayType::Fixed(duration) => duration,
106                DelayType::Random(ref range) => runtime.random_duration(range.clone()),
107            };
108            runtime.sleep_for(duration)?;
109
110            self.output.send(&message)?;
111        }
112        Ok(())
113    }
114}
115
116#[cfg(feature = "std")]
117impl<T: Message + crate::prelude::FromStr + crate::prelude::ToString + 'static> StdioSystem
118    for Delay<T>
119{
120    fn build_system(config: StdioConfig) -> Result<System, StdioError> {
121        use crate::{CoreBlocks, IoBlocks, SystemBuilding};
122
123        config.allow_only(vec!["fixed"])?;
124        let fixed_delay = config.get_opt::<f64>("fixed")?;
125        // TODO: parse "random" parameter as well.
126        let delay = DelayType::Fixed(Duration::from_secs_f64(fixed_delay.unwrap_or(1.)));
127
128        Ok(System::build(|s| {
129            let stdin = config.read_stdin(s);
130            let message_decoder = s.decode_with::<T>(config.encoding);
131            let delayer = s.delay_by(delay);
132            let message_encoder = s.encode_with::<T>(config.encoding);
133            let stdout = config.write_stdout(s);
134            s.connect(&stdin.output, &message_decoder.input);
135            s.connect(&message_decoder.output, &delayer.input);
136            s.connect(&delayer.output, &message_encoder.input);
137            s.connect(&message_encoder.output, &stdout.input);
138        }))
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::Delay;
145    use crate::{System, SystemBuilding};
146
147    #[test]
148    fn instantiate_block() {
149        // Check that the block is constructible:
150        let _ = System::build(|s| {
151            let _ = s.block(Delay::<i32>::with_system(s, None));
152        });
153    }
154}