futuresdr/blocks/
split.rs

1use crate::prelude::*;
2
3/// Apply a function to split a stream.
4#[derive(Block)]
5pub struct Split<
6    F,
7    A,
8    B,
9    C,
10    I = DefaultCpuReader<A>,
11    O0 = DefaultCpuWriter<B>,
12    O1 = DefaultCpuWriter<C>,
13> where
14    F: FnMut(&A) -> (B, C) + Send + 'static,
15    A: Send + 'static,
16    B: Send + 'static,
17    C: Send + 'static,
18    I: CpuBufferReader<Item = A>,
19    O0: CpuBufferWriter<Item = B>,
20    O1: CpuBufferWriter<Item = C>,
21{
22    #[input]
23    input: I,
24    #[output]
25    output0: O0,
26    #[output]
27    output1: O1,
28    f: F,
29}
30
31impl<F, A, B, C, I, O0, O1> Split<F, A, B, C, I, O0, O1>
32where
33    F: FnMut(&A) -> (B, C) + Send + 'static,
34    A: Send + 'static,
35    B: Send + 'static,
36    C: Send + 'static,
37    I: CpuBufferReader<Item = A>,
38    O0: CpuBufferWriter<Item = B>,
39    O1: CpuBufferWriter<Item = C>,
40{
41    /// Create Split block
42    pub fn new(f: F) -> Self {
43        Self {
44            input: I::default(),
45            output0: O0::default(),
46            output1: O1::default(),
47            f,
48        }
49    }
50}
51
52#[doc(hidden)]
53impl<F, A, B, C, I, O1, O2> Kernel for Split<F, A, B, C, I, O1, O2>
54where
55    F: FnMut(&A) -> (B, C) + Send + 'static,
56    A: Send + 'static,
57    B: Send + 'static,
58    C: Send + 'static,
59    I: CpuBufferReader<Item = A>,
60    O1: CpuBufferWriter<Item = B>,
61    O2: CpuBufferWriter<Item = C>,
62{
63    async fn work(
64        &mut self,
65        io: &mut WorkIo,
66        _mio: &mut MessageOutputs,
67        _meta: &mut BlockMeta,
68    ) -> Result<()> {
69        let i0 = self.input.slice();
70        let o0 = self.output0.slice();
71        let o1 = self.output1.slice();
72        let i0_len = i0.len();
73
74        let m = std::cmp::min(i0.len(), o0.len());
75        let m = std::cmp::min(m, o1.len());
76
77        if m > 0 {
78            for (x, (y0, y1)) in i0.iter().zip(o0.iter_mut().zip(o1.iter_mut())) {
79                let (a, b) = (self.f)(x);
80                *y0 = a;
81                *y1 = b;
82            }
83
84            self.input.consume(m);
85            self.output0.produce(m);
86            self.output1.produce(m);
87        }
88
89        if self.input.finished() && m == i0_len {
90            io.finished = true;
91        }
92
93        Ok(())
94    }
95}