futuresdr/blocks/
message_source.rs1use async_io::Timer;
2use std::time::Duration;
3use web_time::Instant;
4
5use crate::runtime::BlockMeta;
6use crate::runtime::Kernel;
7use crate::runtime::MessageOutputs;
8use crate::runtime::Pmt;
9use crate::runtime::Result;
10use crate::runtime::WorkIo;
11
12#[derive(Block)]
14#[message_outputs(out)]
15pub struct MessageSource {
16 message: Pmt,
17 interval: Duration,
18 t_last: Instant,
19 n_messages: Option<usize>,
20}
21
22impl MessageSource {
23 pub fn new(message: Pmt, interval: Duration, n_messages: Option<usize>) -> Self {
25 Self {
26 message,
27 interval,
28 t_last: Instant::now(),
29 n_messages,
30 }
31 }
32
33 async fn sleep(dur: Duration) {
34 Timer::after(dur).await;
35 }
36}
37
38#[doc(hidden)]
39impl Kernel for MessageSource {
40 async fn work(
41 &mut self,
42 io: &mut WorkIo,
43 mio: &mut MessageOutputs,
44 _b: &mut BlockMeta,
45 ) -> Result<()> {
46 let now = Instant::now();
47
48 if now >= self.t_last + self.interval {
49 mio.post("out", self.message.clone()).await?;
50 self.t_last = now;
51 if let Some(ref mut n) = self.n_messages {
52 *n -= 1;
53 if *n == 0 {
54 io.finished = true;
55 }
56 }
57 }
58
59 io.block_on(MessageSource::sleep(
60 self.t_last + self.interval - Instant::now(),
61 ));
62
63 Ok(())
64 }
65
66 async fn init(&mut self, _mio: &mut MessageOutputs, _b: &mut BlockMeta) -> Result<()> {
67 self.t_last = Instant::now();
68 Ok(())
69 }
70}
71
72pub struct MessageSourceBuilder {
101 message: Pmt,
102 duration: Duration,
103 n_messages: Option<usize>,
104}
105
106impl MessageSourceBuilder {
107 pub fn new(message: Pmt, duration: Duration) -> MessageSourceBuilder {
109 MessageSourceBuilder {
110 message,
111 duration,
112 n_messages: None,
113 }
114 }
115 #[must_use]
117 pub fn n_messages(mut self, n: usize) -> MessageSourceBuilder {
118 self.n_messages = Some(n);
119 self
120 }
121 pub fn build(self) -> MessageSource {
123 MessageSource::new(self.message, self.duration, self.n_messages)
124 }
125}