rustradio/
stream_to_pdu.rs

1//! Stream to PDU.
2use std::collections::HashMap;
3
4use log::{debug, trace};
5
6use crate::block::{Block, BlockRet};
7use crate::stream::{NCReadStream, NCWriteStream, ReadStream, Tag, TagPos, TagValue};
8use crate::{Result, Sample};
9
10/// Stream to PDU block.
11///
12/// Turn a tagged stream to PDUs.
13///
14/// PDUs are marked in the stream as `true` when they start, and `false` when
15/// they end. Optionally an extra `tail` samples are also included.
16///
17/// The sample with the `false` tag is not included, unless `tail` is greater
18/// than zero.
19///
20/// Samples between bursts are discarded.
21///
22/// ## Example
23///
24/// This example uses burst tagger to create the tags, and turn a stream
25/// into burst PDUs.
26///
27/// Also see `examples/wpcr.rs`.
28///
29/// ```
30/// use rustradio::graph::{Graph, GraphRunner};
31/// use rustradio::blocks::{FileSource, Tee, ComplexToMag2, SinglePoleIirFilter,BurstTagger,StreamToPdu};
32/// use rustradio::Complex;
33/// let (src, src_out) = FileSource::new("/dev/null")?;
34/// let (tee, data, b) = Tee::new(src_out);
35/// let (c2m, c2m_out) = ComplexToMag2::new(b);
36/// let (iir, iir_out) = SinglePoleIirFilter::new(c2m_out, 0.01).unwrap();
37/// let (burst, prev) = BurstTagger::new(data, iir_out, 0.0001, "burst");
38/// let pdus = StreamToPdu::new(prev, "burst", 10_000, 50);
39/// // pdus.out() now delivers bursts as Vec<Complex>
40/// # Ok::<(), anyhow::Error>(())
41/// ```
42#[derive(rustradio_macros::Block)]
43#[rustradio(crate)]
44pub struct StreamToPdu<T: Sample> {
45    #[rustradio(in)]
46    src: ReadStream<T>,
47    #[rustradio(out)]
48    dst: NCWriteStream<Vec<T>>,
49    tag: String,
50    buf: Vec<T>,
51    endcounter: Option<usize>,
52    max_size: usize,
53    tail: usize,
54}
55
56impl<T: Sample> StreamToPdu<T> {
57    /// Make new Stream to PDU block.
58    pub fn new<S: Into<String>>(
59        src: ReadStream<T>,
60        tag: S,
61        max_size: usize,
62        tail: usize,
63    ) -> (Self, NCReadStream<Vec<T>>) {
64        let (dst, dr) = crate::stream::new_nocopy_stream();
65        (
66            Self {
67                src,
68                tag: tag.into(),
69                dst,
70                buf: Vec::with_capacity(max_size),
71                endcounter: None,
72                max_size,
73                tail,
74            },
75            dr,
76        )
77    }
78
79    /// Burst has arrived. File it.
80    fn done(&mut self) {
81        let mut delme = Vec::with_capacity(self.max_size);
82        std::mem::swap(&mut delme, &mut self.buf);
83        debug!(
84            "StreamToPdu> got burst of size {} samples, {} bytes",
85            delme.len(),
86            delme.len() * T::size()
87        );
88        // TODO: record stream pos.
89        self.dst.push(delme, &[]);
90        self.endcounter = None;
91    }
92}
93
94// If a given tag exists at the given position, return Some(that bool). Else
95// return None.
96fn get_tag_val_bool(tags: &HashMap<(TagPos, &str), &Tag>, pos: TagPos, key: &str) -> Option<bool> {
97    if let Some(tag) = tags.get(&(pos, key)) {
98        match tag.val() {
99            TagValue::Bool(b) => Some(*b),
100            _ => None,
101        }
102    } else {
103        None
104    }
105}
106
107impl<T: Sample> Block for StreamToPdu<T> {
108    fn work(&mut self) -> Result<BlockRet<'_>> {
109        let (input, tags) = self.src.read_buf()?;
110        if input.is_empty() {
111            return Ok(BlockRet::WaitForStream(&self.src, 1));
112        }
113
114        // TODO: we actually only care about one single tag,
115        // and I think we should drop the rest no matter what.
116        let tags = tags
117            .iter()
118            .map(|t| ((t.pos(), t.key()), t))
119            .collect::<HashMap<(TagPos, &str), &Tag>>();
120        trace!("StreamToPdu: tags: {tags:?}");
121
122        for (i, sample) in input.iter().enumerate() {
123            //eprintln!("sample: {i} {sample:?}, {:?}", self.endcounter);
124            if let Some(c) = self.endcounter {
125                self.buf.push(*sample);
126                self.endcounter = Some(c - 1);
127                if c == 1 {
128                    self.done();
129                }
130            } else if let Some(tv) = get_tag_val_bool(&tags, i as TagPos, &self.tag) {
131                if tv {
132                    // Start of burst, save first sample.
133                    self.buf.push(*sample);
134                } else {
135                    // End of burst.
136                    if self.tail > 0 {
137                        self.buf.push(*sample);
138                    }
139                    if self.tail <= 1 {
140                        self.done();
141                    } else {
142                        self.endcounter = Some(self.tail - 1);
143                    }
144                }
145            } else if !self.buf.is_empty() {
146                // Burst continuation.
147                self.buf.push(*sample);
148            }
149            if self.buf.len() > self.max_size {
150                // Too long. Discard buffer and stop saving.
151                self.buf.clear();
152                self.endcounter = None;
153            }
154        }
155        let n = input.len();
156        input.consume(n);
157        Ok(BlockRet::Again)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use crate::Complex;
165    use crate::blocks::VectorSource;
166
167    #[test]
168    fn no_pdu() -> Result<()> {
169        let (mut src, src_out) = VectorSource::builder(vec![Complex::default(); 100]).build()?;
170        let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, 0);
171        assert!(matches![src.work()?, BlockRet::EOF]);
172        assert!(matches![b.work()?, BlockRet::Again]);
173        assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
174        assert!(out.pop().is_none());
175        Ok(())
176    }
177
178    #[test]
179    fn single() -> Result<()> {
180        for (start, end, tail, want) in [
181            (0, 7, 0, vec![1, 2, 3, 4, 5, 6, 7]),
182            (0, 0, 0, vec![]),
183            (0, 0, 1, vec![1]),
184            (1, 1, 0, vec![]),
185            (1, 1, 1, vec![2]),
186            (1, 1, 9, vec![2, 3, 4, 5, 6, 7, 8, 9, 10]),
187            (7, 7, 0, vec![]),
188            (7, 7, 1, vec![8]),
189            (7, 7, 2, vec![8, 9]),
190            (7, 7, 3, vec![8, 9, 10]),
191            (7, 8, 0, vec![8]),
192            (7, 8, 1, vec![8, 9]),
193            (7, 8, 2, vec![8, 9, 10]),
194            (7, 9, 0, vec![8, 9]),
195            (7, 9, 1, vec![8, 9, 10]),
196        ] {
197            eprintln!("Testing with end={end}, tail={tail}, want={want:?}");
198            let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
199                .tags(&[
200                    Tag::new(start, "burst", TagValue::Bool(true)),
201                    Tag::new(4, "test", TagValue::Bool(true)),
202                    Tag::new(end, "burst", TagValue::Bool(false)),
203                ])
204                .build()?;
205            let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, tail);
206            assert!(matches![src.work()?, BlockRet::EOF]);
207            assert!(matches![b.work()?, BlockRet::Again]);
208            assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
209            let (burst, tags) = out.pop().unwrap();
210            assert_eq!(burst, want);
211            assert_eq!(tags, &[]);
212            assert!(out.pop().is_none());
213        }
214        Ok(())
215    }
216
217    #[test]
218    fn ended_too_soon() -> Result<()> {
219        for (end, tail) in [(7, 4), (8, 3), (9, 2)] {
220            eprintln!("Testing with end={end}, tail={tail}");
221            let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
222                .tags(&[
223                    Tag::new(7, "burst", TagValue::Bool(true)),
224                    Tag::new(4, "test", TagValue::Bool(true)),
225                    Tag::new(end, "burst", TagValue::Bool(false)),
226                ])
227                .build()?;
228            let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, tail);
229            assert!(matches![src.work()?, BlockRet::EOF]);
230            assert!(matches![b.work()?, BlockRet::Again]);
231            assert!(out.pop().is_none());
232        }
233        Ok(())
234    }
235
236    #[test]
237    fn mid_pdu() -> Result<()> {
238        let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
239            .tags(&[
240                Tag::new(3, "burst", TagValue::Bool(true)),
241                Tag::new(4, "test", TagValue::Bool(true)),
242                Tag::new(7, "burst", TagValue::Bool(false)),
243            ])
244            .build()?;
245        let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, 0);
246        assert!(matches![src.work()?, BlockRet::EOF]);
247        assert!(matches![b.work()?, BlockRet::Again]);
248        assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
249        let (burst, tags) = out.pop().unwrap();
250        assert_eq!(burst, &[4, 5, 6, 7]);
251        assert_eq!(tags, &[]);
252        assert!(out.pop().is_none());
253        Ok(())
254    }
255}