1use std::collections::HashMap;
3
4use log::{debug, trace};
5
6use crate::block::{Block, BlockRet};
7use crate::stream::{NCReadStream, NCWriteStream, ReadStream, Tag, TagPos, TagValue};
8use crate::{Result, Sample};
9
10#[derive(rustradio_macros::Block)]
43#[rustradio(crate)]
44pub struct StreamToPdu<T: Sample> {
45 #[rustradio(in)]
46 src: ReadStream<T>,
47 #[rustradio(out)]
48 dst: NCWriteStream<Vec<T>>,
49 tag: String,
50 buf: Vec<T>,
51 endcounter: Option<usize>,
52 max_size: usize,
53 tail: usize,
54}
55
56impl<T: Sample> StreamToPdu<T> {
57 pub fn new<S: Into<String>>(
59 src: ReadStream<T>,
60 tag: S,
61 max_size: usize,
62 tail: usize,
63 ) -> (Self, NCReadStream<Vec<T>>) {
64 let (dst, dr) = crate::stream::new_nocopy_stream();
65 (
66 Self {
67 src,
68 tag: tag.into(),
69 dst,
70 buf: Vec::with_capacity(max_size),
71 endcounter: None,
72 max_size,
73 tail,
74 },
75 dr,
76 )
77 }
78
79 fn done(&mut self) {
81 let mut delme = Vec::with_capacity(self.max_size);
82 std::mem::swap(&mut delme, &mut self.buf);
83 debug!(
84 "StreamToPdu> got burst of size {} samples, {} bytes",
85 delme.len(),
86 delme.len() * T::size()
87 );
88 self.dst.push(delme, &[]);
90 self.endcounter = None;
91 }
92}
93
94fn get_tag_val_bool(tags: &HashMap<(TagPos, &str), &Tag>, pos: TagPos, key: &str) -> Option<bool> {
97 if let Some(tag) = tags.get(&(pos, key)) {
98 match tag.val() {
99 TagValue::Bool(b) => Some(*b),
100 _ => None,
101 }
102 } else {
103 None
104 }
105}
106
107impl<T: Sample> Block for StreamToPdu<T> {
108 fn work(&mut self) -> Result<BlockRet<'_>> {
109 let (input, tags) = self.src.read_buf()?;
110 if input.is_empty() {
111 return Ok(BlockRet::WaitForStream(&self.src, 1));
112 }
113
114 let tags = tags
117 .iter()
118 .map(|t| ((t.pos(), t.key()), t))
119 .collect::<HashMap<(TagPos, &str), &Tag>>();
120 trace!("StreamToPdu: tags: {tags:?}");
121
122 for (i, sample) in input.iter().enumerate() {
123 if let Some(c) = self.endcounter {
125 self.buf.push(*sample);
126 self.endcounter = Some(c - 1);
127 if c == 1 {
128 self.done();
129 }
130 } else if let Some(tv) = get_tag_val_bool(&tags, i as TagPos, &self.tag) {
131 if tv {
132 self.buf.push(*sample);
134 } else {
135 if self.tail > 0 {
137 self.buf.push(*sample);
138 }
139 if self.tail <= 1 {
140 self.done();
141 } else {
142 self.endcounter = Some(self.tail - 1);
143 }
144 }
145 } else if !self.buf.is_empty() {
146 self.buf.push(*sample);
148 }
149 if self.buf.len() > self.max_size {
150 self.buf.clear();
152 self.endcounter = None;
153 }
154 }
155 let n = input.len();
156 input.consume(n);
157 Ok(BlockRet::Again)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use crate::Complex;
165 use crate::blocks::VectorSource;
166
167 #[test]
168 fn no_pdu() -> Result<()> {
169 let (mut src, src_out) = VectorSource::builder(vec![Complex::default(); 100]).build()?;
170 let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, 0);
171 assert!(matches![src.work()?, BlockRet::EOF]);
172 assert!(matches![b.work()?, BlockRet::Again]);
173 assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
174 assert!(out.pop().is_none());
175 Ok(())
176 }
177
178 #[test]
179 fn single() -> Result<()> {
180 for (start, end, tail, want) in [
181 (0, 7, 0, vec![1, 2, 3, 4, 5, 6, 7]),
182 (0, 0, 0, vec![]),
183 (0, 0, 1, vec![1]),
184 (1, 1, 0, vec![]),
185 (1, 1, 1, vec![2]),
186 (1, 1, 9, vec![2, 3, 4, 5, 6, 7, 8, 9, 10]),
187 (7, 7, 0, vec![]),
188 (7, 7, 1, vec![8]),
189 (7, 7, 2, vec![8, 9]),
190 (7, 7, 3, vec![8, 9, 10]),
191 (7, 8, 0, vec![8]),
192 (7, 8, 1, vec![8, 9]),
193 (7, 8, 2, vec![8, 9, 10]),
194 (7, 9, 0, vec![8, 9]),
195 (7, 9, 1, vec![8, 9, 10]),
196 ] {
197 eprintln!("Testing with end={end}, tail={tail}, want={want:?}");
198 let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
199 .tags(&[
200 Tag::new(start, "burst", TagValue::Bool(true)),
201 Tag::new(4, "test", TagValue::Bool(true)),
202 Tag::new(end, "burst", TagValue::Bool(false)),
203 ])
204 .build()?;
205 let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, tail);
206 assert!(matches![src.work()?, BlockRet::EOF]);
207 assert!(matches![b.work()?, BlockRet::Again]);
208 assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
209 let (burst, tags) = out.pop().unwrap();
210 assert_eq!(burst, want);
211 assert_eq!(tags, &[]);
212 assert!(out.pop().is_none());
213 }
214 Ok(())
215 }
216
217 #[test]
218 fn ended_too_soon() -> Result<()> {
219 for (end, tail) in [(7, 4), (8, 3), (9, 2)] {
220 eprintln!("Testing with end={end}, tail={tail}");
221 let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
222 .tags(&[
223 Tag::new(7, "burst", TagValue::Bool(true)),
224 Tag::new(4, "test", TagValue::Bool(true)),
225 Tag::new(end, "burst", TagValue::Bool(false)),
226 ])
227 .build()?;
228 let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, tail);
229 assert!(matches![src.work()?, BlockRet::EOF]);
230 assert!(matches![b.work()?, BlockRet::Again]);
231 assert!(out.pop().is_none());
232 }
233 Ok(())
234 }
235
236 #[test]
237 fn mid_pdu() -> Result<()> {
238 let (mut src, src_out) = VectorSource::builder(vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10])
239 .tags(&[
240 Tag::new(3, "burst", TagValue::Bool(true)),
241 Tag::new(4, "test", TagValue::Bool(true)),
242 Tag::new(7, "burst", TagValue::Bool(false)),
243 ])
244 .build()?;
245 let (mut b, out) = StreamToPdu::new(src_out, "burst", 10, 0);
246 assert!(matches![src.work()?, BlockRet::EOF]);
247 assert!(matches![b.work()?, BlockRet::Again]);
248 assert!(matches![b.work()?, BlockRet::WaitForStream(_, 1)]);
249 let (burst, tags) = out.pop().unwrap();
250 assert_eq!(burst, &[4, 5, 6, 7]);
251 assert_eq!(tags, &[]);
252 assert!(out.pop().is_none());
253 Ok(())
254 }
255}