futuresdr/blocks/
message_source.rs

1use async_io::Timer;
2use std::time::Duration;
3use web_time::Instant;
4
5use crate::runtime::BlockMeta;
6use crate::runtime::Kernel;
7use crate::runtime::MessageOutputs;
8use crate::runtime::Pmt;
9use crate::runtime::Result;
10use crate::runtime::WorkIo;
11
12/// Output the same message periodically.
13#[derive(Block)]
14#[message_outputs(out)]
15pub struct MessageSource {
16    message: Pmt,
17    interval: Duration,
18    t_last: Instant,
19    n_messages: Option<usize>,
20}
21
22impl MessageSource {
23    /// Create MessageSource block
24    pub fn new(message: Pmt, interval: Duration, n_messages: Option<usize>) -> Self {
25        Self {
26            message,
27            interval,
28            t_last: Instant::now(),
29            n_messages,
30        }
31    }
32
33    async fn sleep(dur: Duration) {
34        Timer::after(dur).await;
35    }
36}
37
38#[doc(hidden)]
39impl Kernel for MessageSource {
40    async fn work(
41        &mut self,
42        io: &mut WorkIo,
43        mio: &mut MessageOutputs,
44        _b: &mut BlockMeta,
45    ) -> Result<()> {
46        let now = Instant::now();
47
48        if now >= self.t_last + self.interval {
49            mio.post("out", self.message.clone()).await?;
50            self.t_last = now;
51            if let Some(ref mut n) = self.n_messages {
52                *n -= 1;
53                if *n == 0 {
54                    io.finished = true;
55                }
56            }
57        }
58
59        io.block_on(MessageSource::sleep(
60            self.t_last + self.interval - Instant::now(),
61        ));
62
63        Ok(())
64    }
65
66    async fn init(&mut self, _mio: &mut MessageOutputs, _b: &mut BlockMeta) -> Result<()> {
67        self.t_last = Instant::now();
68        Ok(())
69    }
70}
71
72/// Repeats a fixed message on an interval
73///
74/// # Inputs
75///
76/// No inputs.
77///
78/// # Outputs
79///
80/// **Message**: `out`: Message output
81///
82/// # Usage
83/// ```
84/// use std::time;
85/// use futuresdr::blocks::MessageSourceBuilder;
86/// use futuresdr::runtime::{Flowgraph, Pmt};
87///
88/// let mut fg = Flowgraph::new();
89///
90/// // Repeat the message "foo" every 100ms twenty times
91/// let msg_source = fg.add_block(
92///     MessageSourceBuilder::new(
93///         Pmt::String("foo".to_string()),
94///         time::Duration::from_millis(100),
95///     )
96///     .n_messages(20)
97///     .build()
98/// );
99/// ```
100pub struct MessageSourceBuilder {
101    message: Pmt,
102    duration: Duration,
103    n_messages: Option<usize>,
104}
105
106impl MessageSourceBuilder {
107    /// Create MessageSource builder
108    pub fn new(message: Pmt, duration: Duration) -> MessageSourceBuilder {
109        MessageSourceBuilder {
110            message,
111            duration,
112            n_messages: None,
113        }
114    }
115    /// Number of message to send
116    #[must_use]
117    pub fn n_messages(mut self, n: usize) -> MessageSourceBuilder {
118        self.n_messages = Some(n);
119        self
120    }
121    /// Build Message Source block
122    pub fn build(self) -> MessageSource {
123        MessageSource::new(self.message, self.duration, self.n_messages)
124    }
125}