fsdr_blocks/stream/
deinterleave.rs

1use futuresdr::anyhow::Result;
2use futuresdr::runtime::Block;
3use futuresdr::runtime::BlockMeta;
4use futuresdr::runtime::BlockMetaBuilder;
5use futuresdr::runtime::Kernel;
6use futuresdr::runtime::MessageIo;
7use futuresdr::runtime::MessageIoBuilder;
8use futuresdr::runtime::StreamIo;
9use futuresdr::runtime::StreamIoBuilder;
10use futuresdr::runtime::WorkIo;
11
12/// This blocks deinterleave a unique stream into two separate stream.
13/// Typically used to deinterleave iq stream into of stream for `i` and one for `q`.
14///
15/// # Usage
16/// ```
17/// use fsdr_blocks::stream::Deinterleave;
18/// let blk = Deinterleave::<f32>::new();
19/// ```
20pub struct Deinterleave<A>
21where
22    A: Send + 'static + Copy,
23{
24    _p1: std::marker::PhantomData<A>,
25    first: bool,
26}
27
28impl<A> Deinterleave<A>
29where
30    A: Send + 'static + Copy,
31{
32    #[allow(clippy::new_ret_no_self)]
33    pub fn new() -> Block {
34        Block::new(
35            BlockMetaBuilder::new("Deinterleave").build(),
36            StreamIoBuilder::new()
37                .add_input::<A>("in")
38                .add_output::<A>("out0")
39                .add_output::<A>("out1")
40                .build(),
41            MessageIoBuilder::<Self>::new().build(),
42            Deinterleave {
43                _p1: std::marker::PhantomData,
44                first: true,
45            },
46        )
47    }
48}
49
50#[doc(hidden)]
51#[async_trait]
52impl<A> Kernel for Deinterleave<A>
53where
54    A: Send + 'static + Copy,
55{
56    async fn work(
57        &mut self,
58        io: &mut WorkIo,
59        sio: &mut StreamIo,
60        _mio: &mut MessageIo<Self>,
61        _meta: &mut BlockMeta,
62    ) -> Result<()> {
63        let i0 = sio.input(0).slice::<A>();
64        let mut m0 = 0;
65        let mut m1 = 0;
66        let o0 = sio.output(0).slice::<A>();
67        let o1 = sio.output(1).slice::<A>();
68
69        let mut it0 = o0.iter_mut();
70        let mut it1 = o1.iter_mut();
71
72        for x in i0.iter() {
73            if self.first {
74                let d = it0.next();
75                if d.is_none() {
76                    break;
77                }
78                let d = d.expect("");
79                *d = *x;
80                m0 += 1;
81            } else {
82                let d = it1.next();
83                if d.is_none() {
84                    break;
85                }
86                let d = d.expect("");
87                *d = *x;
88                m1 += 1;
89            }
90            self.first = !self.first;
91        }
92
93        let m = m0 + m1;
94        sio.input(0).consume(m);
95        sio.output(0).produce(m0);
96        sio.output(1).produce(m1);
97
98        if sio.input(0).finished() && m == i0.len() {
99            io.finished = true;
100        }
101
102        Ok(())
103    }
104}