rustradio/
rtlsdr_decode.rs

1//! Decode RTL-SDR's byte based format into Complex I/Q.
2use crate::Result;
3
4use crate::block::{Block, BlockRet};
5use crate::stream::{ReadStream, WriteStream};
6use crate::{Complex, Float};
7
8/// Decode RTL-SDR's byte based format into Complex I/Q.
9#[derive(rustradio_macros::Block)]
10#[rustradio(crate, new)]
11pub struct RtlSdrDecode {
12    #[rustradio(in)]
13    src: ReadStream<u8>,
14    #[rustradio(out)]
15    dst: WriteStream<Complex>,
16}
17
18impl Block for RtlSdrDecode {
19    fn work(&mut self) -> Result<BlockRet> {
20        // TODO: handle tags.
21        let (input, _tags) = self.src.read_buf()?;
22        let isamples = input.len() & !1;
23        if isamples == 0 {
24            return Ok(BlockRet::WaitForStream(&self.src, 2));
25        }
26        let mut out = self.dst.write_buf()?;
27        if out.is_empty() {
28            return Ok(BlockRet::WaitForStream(&self.dst, 1));
29        }
30        let isamples = std::cmp::min(isamples, out.len() * 2);
31        let osamples = isamples / 2;
32        assert_ne!(osamples, 0);
33
34        out.fill_from_iter(
35            input
36                .slice()
37                .chunks_exact(2)
38                .map(|e| ((e[0] as Float), (e[1] as Float)))
39                .map(|(a, b)| Complex::new((a - 127.0) * 0.008, (b - 127.0) * 0.008)),
40        );
41        input.consume(isamples);
42        out.produce(osamples, &[]);
43        Ok(BlockRet::Again)
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50    use crate::Complex;
51    use crate::blocks::VectorSource;
52
53    #[test]
54    fn empty() -> crate::Result<()> {
55        let (mut src, src_out) = VectorSource::new(vec![]);
56        let r = src.work()?;
57        assert!(matches![r, BlockRet::EOF], "Want EOF, got {r:?}");
58        let (mut dec, dec_out) = RtlSdrDecode::new(src_out);
59        assert!(matches![dec.work()?, BlockRet::WaitForStream(_, _)]);
60        let (res, _) = dec_out.read_buf()?;
61        assert_eq!(res.len(), 0);
62        Ok(())
63    }
64
65    #[test]
66    fn some_input() -> crate::Result<()> {
67        let (mut src, src_out) = VectorSource::new(vec![0, 10, 20, 10, 0, 13]);
68        assert!(matches![src.work()?, BlockRet::EOF]);
69        let (mut dec, dec_out) = RtlSdrDecode::new(src_out);
70        assert!(matches![dec.work()?, BlockRet::Again]);
71        let (res, _) = dec_out.read_buf()?;
72        // Probably this should compare close to, but not equal.
73        assert_eq!(
74            res.slice(),
75            &[
76                Complex {
77                    re: -1.016,
78                    im: -0.93600005
79                },
80                Complex {
81                    re: -0.85600007,
82                    im: -0.93600005
83                },
84                Complex {
85                    re: -1.016,
86                    im: -0.91200006
87                }
88            ]
89        );
90        Ok(())
91    }
92
93    #[test]
94    fn uneven() -> crate::Result<()> {
95        let (mut src, src_out) = VectorSource::new(vec![0, 10, 20, 10, 0]);
96        assert!(matches![src.work()?, BlockRet::EOF]);
97        let (mut dec, dec_out) = RtlSdrDecode::new(src_out);
98        assert!(matches![dec.work()?, BlockRet::Again]);
99        let (res, _) = dec_out.read_buf()?;
100        assert_eq!(res.len(), 2);
101        Ok(())
102    }
103
104    #[test]
105    fn overflow() -> crate::Result<()> {
106        // Input is pairs of bytes. Output is complex, meaning a 4x increase. That won't fit.
107        let (mut src, src_out) = VectorSource::new(vec![0; crate::stream::DEFAULT_STREAM_SIZE]);
108        assert!(matches![src.work()?, BlockRet::EOF]);
109        let (mut dec, dec_out) = RtlSdrDecode::new(src_out);
110        for n in 0..4 {
111            eprintln!("loop iter: {n}");
112            assert!(matches![dec.work()?, BlockRet::Again]);
113            let (res, _) = dec_out.read_buf()?;
114            assert_eq!(res.len(), crate::stream::DEFAULT_STREAM_SIZE / 8);
115            res.consume(crate::stream::DEFAULT_STREAM_SIZE / 8);
116        }
117        // Finally there's no more input bytes to process.
118        assert!(matches![dec.work()?, BlockRet::WaitForStream(_, _)]);
119        Ok(())
120    }
121}