1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! Burst tagger.
//!
//! Add tags to a stream to indicate stand and end of a burst. Does not
//! otherwise modify the stream.
use std::borrow::Cow;
use crate::stream::{ReadStream, Tag, TagValue, WriteStream};
use crate::{Float, Sample};
/// Burst tagger
///
/// This block takes two inputs. One data stream, of any type, that will
/// be passed through as-is. And a threshold stream, of type Float, that
/// when it goes above the threshold, adds a tag to the data stream with
/// the value `true`. When it goes below, it adds the same tag with the
/// value `false`.
///
/// The float input should likely be filtered with an IIR filter.
///
/// ## Example
///
/// This example uses burst tagger to create the tags, and turn a stream
/// into burst PDUs.
///
/// Also see `examples/wpcr.rs`.
///
/// ```
/// use rustradio::graph::{Graph, GraphRunner};
/// use rustradio::blocks::{FileSource, Tee, ComplexToMag2, SinglePoleIirFilter,BurstTagger,StreamToPdu};
/// use rustradio::Complex;
/// let (src, src_out) = FileSource::new("/dev/null")?;
/// let (tee, data, b) = Tee::new(src_out);
/// let (c2m, c2m_out) = ComplexToMag2::new(b);
/// let (iir, iir_out) = SinglePoleIirFilter::new(c2m_out, 0.01).unwrap();
/// let (burst, burst_out) = BurstTagger::new(data, iir_out, 0.0001, "burst");
/// let pdus = StreamToPdu::new(burst_out, "burst", 10_000, 50);
/// // pdus.out() now delivers bursts as Vec<Complex>
/// # Ok::<(), anyhow::Error>(())
/// ```
///
/// ## Constructor arguments
///
/// * `src`: Source data stream, will pass through and get tags.
/// * `trigger: Trigger stream.
/// * `threshold`: Threshold on trigger stream.
/// * `tag`: Tag name to add.
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new, sync_tag)]
pub struct BurstTagger<T: Sample> {
#[rustradio(in)]
src: ReadStream<T>,
#[rustradio(in)]
trigger: ReadStream<Float>,
#[rustradio(out)]
dst: WriteStream<T>,
threshold: Float,
#[rustradio(into)]
tag: String,
#[rustradio(default)]
last: bool,
}
impl<T: Sample> BurstTagger<T> {
fn process_sync_tags<'a>(
&mut self,
s: T,
tags: &'a [Tag],
tv: Float,
_tv_tags: &[Tag],
) -> (T, Cow<'a, [Tag]>) {
let cur = tv > self.threshold;
let tags = if cur == self.last {
Cow::Borrowed(tags)
} else {
let mut owned_tags: Vec<Tag> = tags.to_vec();
owned_tags.push(Tag::new(0, self.tag.clone(), TagValue::Bool(cur)));
Cow::Owned(owned_tags)
};
self.last = cur;
(s, tags)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Result;
use crate::block::Block;
use crate::blocks::{VectorSink, VectorSource};
fn tag_compare(left: &[Tag], right: &[Tag]) {
let mut left = left.to_vec();
left.sort_by(|a, b| a.partial_cmp(b).unwrap());
let mut right = right.to_vec();
right.sort_by(|a, b| a.partial_cmp(b).unwrap());
assert_eq!(left, right);
}
#[test]
fn tag_it() -> Result<()> {
let (mut src, src_out) = VectorSource::new((0..100).map(|i| i as u32).collect());
let (mut trigger, trigger_out) = VectorSource::new(
(0..100)
.map(|i| match i as u32 {
0..80 => 0.1,
80..90 => 0.3,
90.. => 0.2,
})
.collect(),
);
let (mut b, b_out) = BurstTagger::new(src_out, trigger_out, 0.25, "burst");
let mut sink = VectorSink::new(b_out, 1000);
src.work()?;
trigger.work()?;
b.work()?;
sink.work()?;
let want: Vec<_> = (0..100).map(|i| i as u32).collect();
assert_eq!(sink.hook().data().samples(), want);
tag_compare(
sink.hook().data().tags(),
&[
Tag::new(0, "VectorSource::start", TagValue::Bool(true)),
Tag::new(0, "VectorSource::first", TagValue::Bool(true)),
Tag::new(0, "VectorSource::repeat", TagValue::U64(0)),
Tag::new(90, "burst", TagValue::Bool(false)),
Tag::new(80, "burst", TagValue::Bool(true)),
],
);
Ok(())
}
}