1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use crate::runtime::dev::prelude::*;
/// This [`Block`] applies a callback function to incoming messages, emitting the result as a new message.
///
/// `None` return values are filtered out.
///
/// # Message Inputs
///
/// `msg_handler`: Messages passed to the callback.
///
/// # Message Outputs
///
/// `out`: Callback results returned as `Some`.
///
/// # Usage
/// ```
/// use futuresdr::blocks::MessageApply;
/// use futuresdr::runtime::Pmt;
///
/// let apply = MessageApply::new(|p| Ok(Some(p)));
/// ```
#[derive(Block)]
#[message_inputs(msg_handler)]
#[message_outputs(out)]
#[null_kernel]
pub struct MessageApply<F>
where
F: FnMut(Pmt) -> Result<Option<Pmt>> + Send + 'static,
{
callback: F,
}
impl<F> MessageApply<F>
where
F: FnMut(Pmt) -> Result<Option<Pmt>> + Send + 'static,
{
/// Apply a function to each incoming message.
///
/// `None` values are filtered out.
///
/// # Arguments
///
/// * `callback`: Function to apply to each incoming message, filtering `None` values.
///
pub fn new(callback: F) -> Self {
Self { callback }
}
async fn msg_handler(
&mut self,
_io: &mut WorkIo,
mo: &mut MessageOutputs,
_meta: &mut BlockMeta,
p: Pmt,
) -> Result<Pmt> {
let r = (self.callback)(p)?;
if let Some(r) = r {
mo.post("out", r).await?;
}
Ok(Pmt::Ok)
}
}