futuresdr/blocks/
split.rs1use crate::prelude::*;
2
3#[derive(Block)]
5pub struct Split<
6 F,
7 A,
8 B,
9 C,
10 I = DefaultCpuReader<A>,
11 O0 = DefaultCpuWriter<B>,
12 O1 = DefaultCpuWriter<C>,
13> where
14 F: FnMut(&A) -> (B, C) + Send + 'static,
15 A: Send + 'static,
16 B: Send + 'static,
17 C: Send + 'static,
18 I: CpuBufferReader<Item = A>,
19 O0: CpuBufferWriter<Item = B>,
20 O1: CpuBufferWriter<Item = C>,
21{
22 #[input]
23 input: I,
24 #[output]
25 output0: O0,
26 #[output]
27 output1: O1,
28 f: F,
29}
30
31impl<F, A, B, C, I, O0, O1> Split<F, A, B, C, I, O0, O1>
32where
33 F: FnMut(&A) -> (B, C) + Send + 'static,
34 A: Send + 'static,
35 B: Send + 'static,
36 C: Send + 'static,
37 I: CpuBufferReader<Item = A>,
38 O0: CpuBufferWriter<Item = B>,
39 O1: CpuBufferWriter<Item = C>,
40{
41 pub fn new(f: F) -> Self {
43 Self {
44 input: I::default(),
45 output0: O0::default(),
46 output1: O1::default(),
47 f,
48 }
49 }
50}
51
52#[doc(hidden)]
53impl<F, A, B, C, I, O1, O2> Kernel for Split<F, A, B, C, I, O1, O2>
54where
55 F: FnMut(&A) -> (B, C) + Send + 'static,
56 A: Send + 'static,
57 B: Send + 'static,
58 C: Send + 'static,
59 I: CpuBufferReader<Item = A>,
60 O1: CpuBufferWriter<Item = B>,
61 O2: CpuBufferWriter<Item = C>,
62{
63 async fn work(
64 &mut self,
65 io: &mut WorkIo,
66 _mio: &mut MessageOutputs,
67 _meta: &mut BlockMeta,
68 ) -> Result<()> {
69 let i0 = self.input.slice();
70 let o0 = self.output0.slice();
71 let o1 = self.output1.slice();
72 let i0_len = i0.len();
73
74 let m = std::cmp::min(i0.len(), o0.len());
75 let m = std::cmp::min(m, o1.len());
76
77 if m > 0 {
78 for (x, (y0, y1)) in i0.iter().zip(o0.iter_mut().zip(o1.iter_mut())) {
79 let (a, b) = (self.f)(x);
80 *y0 = a;
81 *y1 = b;
82 }
83
84 self.input.consume(m);
85 self.output0.produce(m);
86 self.output1.produce(m);
87 }
88
89 if self.input.finished() && m == i0_len {
90 io.finished = true;
91 }
92
93 Ok(())
94 }
95}