1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// This is free and unencumbered software released into the public domain.

use crate::{
    scheduler::Duration, Block, InputPort, Message, OutputPort, Port, PortDescriptor, Scheduler,
};
use std::ops::Range;

#[cfg(feature = "std")]
use rand::Rng;

/// A block that passes messages through while delaying them by a fixed or
/// random duration.
pub struct Delay<T: Message> {
    /// The input message stream.
    input: InputPort<T>,
    /// The output target for the stream being passed through.
    output: OutputPort<T>,
    /// A configuration parameter for which type of delay to add.
    delay: DelayType,
}

/// The type of delay (fixed or random) to apply to message relay.
pub enum DelayType {
    Fixed(Duration),
    Random(Range<Duration>),
}

impl<T: Message> Block for Delay<T> {
    fn inputs(&self) -> Vec<PortDescriptor> {
        vec![PortDescriptor::from(&self.input)]
    }

    fn outputs(&self) -> Vec<PortDescriptor> {
        vec![PortDescriptor::from(&self.output)]
    }

    fn execute(&mut self, scheduler: &dyn Scheduler) -> Result<(), ()> {
        while let Some(message) = self.input.receive()? {
            if !self.output.is_connected() {
                drop(message);
                continue;
            }

            let duration = match self.delay {
                DelayType::Fixed(duration) => duration,
                DelayType::Random(ref range) => {
                    #[cfg(feature = "std")]
                    {
                        let mut rng = rand::thread_rng();
                        let low = range.start.as_nanos() as u64;
                        let high = range.end.as_nanos() as u64;
                        Duration::from_nanos(rng.gen_range(low, high))
                    }
                    #[cfg(not(feature = "std"))]
                    let mut _rng = todo!();
                }
            };
            scheduler.sleep_for(duration)?;

            self.output.send(&message)?;
        }
        Ok(())
    }
}