1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use parking_lot::Mutex;
use pico_common::PicoChannel;
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
pub trait NewDataHandler: Send + Sync {
fn handle_event(&self, value: &StreamingEvent);
}
#[derive(Clone)]
pub struct EventsInner {
pub listeners: Vec<Weak<dyn NewDataHandler>>,
}
impl EventsInner {
pub fn new() -> Self {
EventsInner {
listeners: Default::default(),
}
}
}
#[derive(Clone)]
pub struct StreamingEvents {
inner: Arc<Mutex<EventsInner>>,
}
impl StreamingEvents {
pub fn new() -> Self {
StreamingEvents {
inner: Arc::new(Mutex::new(EventsInner::new())),
}
}
#[tracing::instrument(level = "trace", skip(self, observer))]
pub fn subscribe(&self, observer: Arc<dyn NewDataHandler>) {
self.inner.lock().listeners.push(Arc::downgrade(&observer));
}
#[tracing::instrument(level = "trace", skip(self, value))]
pub fn emit(&self, value: StreamingEvent) {
for listener in self.inner.lock().listeners.iter() {
if let Some(listener) = listener.upgrade() {
listener.handle_event(&value);
}
}
}
}
impl Default for StreamingEvents {
fn default() -> Self {
StreamingEvents::new()
}
}
#[derive(Clone)]
pub struct StreamingEvent {
pub length: usize,
pub samples_per_second: u32,
pub channels: HashMap<PicoChannel, RawChannelDataBlock>,
}
#[derive(Clone)]
pub struct RawChannelDataBlock {
pub multiplier: f64,
pub samples: Vec<i16>,
}
impl RawChannelDataBlock {
pub fn scale_samples(&self) -> Vec<f64> {
self.samples
.iter()
.map(|v| *v as f64 * self.multiplier)
.collect()
}
#[inline(always)]
pub fn scale_sample(&self, index: usize) -> f64 {
self.samples[index] as f64 * self.multiplier
}
}