protoflow_blocks/blocks/core/
delay.rs1use crate::{
4 prelude::{vec, Duration},
5 types::DelayType,
6 StdioConfig, StdioError, StdioSystem, System,
7};
8use protoflow_core::{
9 types::Any, Block, BlockResult, BlockRuntime, InputPort, Message, OutputPort, Port,
10};
11use protoflow_derive::Block;
12use simple_mermaid::mermaid;
13
14#[doc = mermaid!("../../../doc/core/delay.mmd")]
19#[doc = mermaid!("../../../doc/core/delay.seq.mmd" framed)]
22#[derive(Block, Clone)]
57pub struct Delay<T: Message = Any> {
58 #[input]
60 pub input: InputPort<T>,
61
62 #[output]
64 pub output: OutputPort<T>,
65
66 #[parameter]
68 pub delay: DelayType,
69}
70
71impl<T: Message> Delay<T> {
72 pub fn new(input: InputPort<T>, output: OutputPort<T>) -> Self {
73 Self::with_params(input, output, None)
74 }
75
76 pub fn with_params(
77 input: InputPort<T>,
78 output: OutputPort<T>,
79 delay: Option<DelayType>,
80 ) -> Self {
81 Self {
82 input,
83 output,
84 delay: delay.unwrap_or_default(),
85 }
86 }
87}
88
89impl<T: Message + 'static> Delay<T> {
90 pub fn with_system(system: &System, delay: Option<DelayType>) -> Self {
91 use crate::SystemBuilding;
92 Self::with_params(system.input(), system.output(), delay)
93 }
94}
95
96impl<T: Message> Block for Delay<T> {
97 fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
98 while let Some(message) = self.input.recv()? {
99 if !self.output.is_connected() {
100 drop(message);
101 continue;
102 }
103
104 let duration = match self.delay {
105 DelayType::Fixed(duration) => duration,
106 DelayType::Random(ref range) => runtime.random_duration(range.clone()),
107 };
108 runtime.sleep_for(duration)?;
109
110 self.output.send(&message)?;
111 }
112 Ok(())
113 }
114}
115
116#[cfg(feature = "std")]
117impl<T: Message + crate::prelude::FromStr + crate::prelude::ToString + 'static> StdioSystem
118 for Delay<T>
119{
120 fn build_system(config: StdioConfig) -> Result<System, StdioError> {
121 use crate::{CoreBlocks, IoBlocks, SystemBuilding};
122
123 config.allow_only(vec!["fixed"])?;
124 let fixed_delay = config.get_opt::<f64>("fixed")?;
125 let delay = DelayType::Fixed(Duration::from_secs_f64(fixed_delay.unwrap_or(1.)));
127
128 Ok(System::build(|s| {
129 let stdin = config.read_stdin(s);
130 let message_decoder = s.decode_with::<T>(config.encoding);
131 let delayer = s.delay_by(delay);
132 let message_encoder = s.encode_with::<T>(config.encoding);
133 let stdout = config.write_stdout(s);
134 s.connect(&stdin.output, &message_decoder.input);
135 s.connect(&message_decoder.output, &delayer.input);
136 s.connect(&delayer.output, &message_encoder.input);
137 s.connect(&message_encoder.output, &stdout.input);
138 }))
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::Delay;
145 use crate::{System, SystemBuilding};
146
147 #[test]
148 fn instantiate_block() {
149 let _ = System::build(|s| {
151 let _ = s.block(Delay::<i32>::with_system(s, None));
152 });
153 }
154}