1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
use crate::anyhow::Result;
use crate::runtime::Block;
use crate::runtime::BlockMeta;
use crate::runtime::BlockMetaBuilder;
use crate::runtime::Kernel;
use crate::runtime::MessageIo;
use crate::runtime::MessageIoBuilder;
use crate::runtime::StreamIo;
use crate::runtime::StreamIoBuilder;
use crate::runtime::TypedBlock;
use crate::runtime::WorkIo;
/// Apply a function to each sample.
///
/// # Stream Inputs
///
/// `in`: Input
///
/// # Stream Outputs
///
/// `out`: Output, corresponding to input with function applied
///
/// # Usage
/// ```
/// use futuresdr::blocks::Apply;
/// use futuresdr::runtime::Flowgraph;
/// use num_complex::Complex;
///
/// let mut fg = Flowgraph::new();
///
/// // Double each sample
/// let doubler = fg.add_block(Apply::new(|i: &f32| i * 2.0));
///
/// // Note that the closure can also hold state
/// let mut last_value = 0.0;
/// let moving_average = fg.add_block(Apply::new(move |i: &f32| {
/// let new_value = (last_value + i) / 2.0;
/// last_value = *i;
/// new_value
/// }));
///
/// // Additionally, the closure can change the type of the sample
/// let to_complex = fg.add_block(Apply::new(|i: &f32| {
/// Complex {
/// re: 0.0,
/// im: *i,
/// }
/// }));
/// ```
pub struct Apply<F, A, B>
where
F: FnMut(&A) -> B + Send + 'static,
A: Send + 'static,
B: Send + 'static,
{
f: F,
_p1: std::marker::PhantomData<A>,
_p2: std::marker::PhantomData<B>,
}
impl<F, A, B> Apply<F, A, B>
where
F: FnMut(&A) -> B + Send + 'static,
A: Send + 'static,
B: Send + 'static,
{
/// Create [`Apply`] block
///
/// ## Parameter
/// - `f`: Function to apply on each sample
pub fn new(f: F) -> Block {
Block::from_typed(Self::new_typed(f))
}
/// Create typed [`Apply`] block
///
/// ## Parameter
/// - `f`: Function to apply on each sample
pub fn new_typed(f: F) -> TypedBlock<Self> {
TypedBlock::new(
BlockMetaBuilder::new("Apply").build(),
StreamIoBuilder::new()
.add_input::<A>("in")
.add_output::<B>("out")
.build(),
MessageIoBuilder::<Self>::new().build(),
Self {
f,
_p1: std::marker::PhantomData,
_p2: std::marker::PhantomData,
},
)
}
}
#[doc(hidden)]
#[async_trait]
impl<F, A, B> Kernel for Apply<F, A, B>
where
F: FnMut(&A) -> B + Send + 'static,
A: Send + 'static,
B: Send + 'static,
{
async fn work(
&mut self,
io: &mut WorkIo,
sio: &mut StreamIo,
_mio: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let i = sio.input(0).slice::<A>();
let o = sio.output(0).slice::<B>();
let m = std::cmp::min(i.len(), o.len());
if m > 0 {
for (v, r) in i.iter().zip(o.iter_mut()) {
*r = (self.f)(v);
}
sio.input(0).consume(m);
sio.output(0).produce(m);
}
if sio.input(0).finished() && m == i.len() {
io.finished = true;
}
Ok(())
}
}