1#![forbid(unsafe_code)]
2
3use crossbeam::channel::{bounded, Sender};
52use events::StreamingEvents;
53pub use events::{NewDataHandler, RawChannelDataBlock, StreamingEvent};
54use parking_lot::RwLock;
55use pico_common::{
56 ChannelConfig, PicoChannel, PicoCoupling, PicoRange, PicoResult, PicoStatus, SampleConfig,
57};
58use pico_device::PicoDevice;
59use std::{
60 collections::HashMap, fmt, pin::Pin, sync::Arc, thread, thread::JoinHandle, time::Duration,
61};
62use tracing::*;
63
64mod events;
65
66#[cfg_attr(feature = "serde", derive(serde::Serialize))]
67#[derive(Debug, Clone, Copy)]
68enum Target {
69 Closed,
70 Open,
71 Streaming { requested_sample_rate: u32 },
72}
73
74#[cfg_attr(feature = "serde", derive(serde::Serialize))]
75#[derive(Clone)]
76struct LockedTarget(Arc<RwLock<Target>>);
77
78impl LockedTarget {
79 pub fn new(target: Target) -> Self {
80 LockedTarget(Arc::new(RwLock::new(target)))
81 }
82
83 pub fn set(&self, new: Target) {
84 *self.0.write() = new;
85 }
86
87 pub fn get(&self) -> Target {
88 *self.0.read()
89 }
90}
91
92impl fmt::Debug for LockedTarget {
93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.write_fmt(format_args!("{:?}", self.0.try_read()))
95 }
96}
97
98type BufferMap = HashMap<PicoChannel, Arc<RwLock<Pin<Vec<i16>>>>>;
99
100#[cfg_attr(feature = "serde", derive(serde::Serialize))]
101#[derive(Clone)]
102enum State {
103 Closed,
104 Open {
105 handle: i16,
106 },
107 Streaming {
108 handle: i16,
109 actual_sample_rate: u32,
110 #[cfg_attr(feature = "serde", serde(skip))]
111 buffers: BufferMap,
112 },
113}
114
115impl PartialEq for State {
116 fn eq(&self, other: &Self) -> bool {
117 matches!(
118 (self, other),
119 (State::Closed, State::Closed)
120 | (State::Open { .. }, State::Open { .. })
121 | (State::Streaming { .. }, State::Streaming { .. })
122 )
123 }
124}
125
126impl fmt::Debug for State {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 match self {
129 State::Closed => f.debug_struct("Closed").finish(),
130 State::Open { handle } => f.debug_struct("Open").field("handle", handle).finish(),
131 State::Streaming {
132 handle,
133 actual_sample_rate,
134 ..
135 } => f
136 .debug_struct("Streaming")
137 .field("handle", handle)
138 .field("actual_sample_rate", actual_sample_rate)
139 .finish(),
140 }
141 }
142}
143
144#[cfg_attr(feature = "serde", derive(serde::Serialize))]
149#[derive(Clone)]
150pub struct PicoStreamingDevice {
151 device: PicoDevice,
152 target_state: LockedTarget,
153 current_state: Arc<RwLock<State>>,
154 enabled_channels: Arc<RwLock<HashMap<PicoChannel, ChannelConfig>>>,
155 #[cfg_attr(feature = "serde", serde(skip))]
156 background_handle: Option<Arc<BackgroundThreadHandle>>,
157 #[cfg_attr(feature = "serde", serde(skip))]
158 pub new_data: StreamingEvents,
159}
160
161impl fmt::Debug for PicoStreamingDevice {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 f.debug_struct("PicoStreamingDevice")
164 .field("device", &self.device)
165 .field("target_state", &self.target_state)
166 .field("current_state", &self.current_state.try_read())
167 .finish()
168 }
169}
170
171impl PartialEq for PicoStreamingDevice {
172 fn eq(&self, other: &Self) -> bool {
173 self.get_serial() == other.get_serial()
174 }
175}
176
177impl Eq for PicoStreamingDevice {}
178
179impl std::hash::Hash for PicoStreamingDevice {
180 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
181 self.get_serial().hash(state);
182 }
183}
184
185impl From<PicoDevice> for PicoStreamingDevice {
186 fn from(d: PicoDevice) -> Self {
187 PicoStreamingDevice::new(d)
188 }
189}
190
191impl PicoStreamingDevice {
192 fn new(device: PicoDevice) -> Self {
193 let (current_state, target_state) = match device.handle.lock().take() {
194 Some(handle) => (State::Open { handle }, Target::Open),
195 None => (State::Closed, Target::Closed),
196 };
197
198 let mut device = PicoStreamingDevice {
199 device,
200 target_state: LockedTarget::new(target_state),
201 current_state: Arc::new(RwLock::new(current_state)),
202 new_data: Default::default(),
203 enabled_channels: Default::default(),
204 background_handle: Default::default(),
205 };
206
207 device.start_background_thread();
208
209 device
210 }
211
212 pub fn get_serial(&self) -> String {
213 self.device.serial.to_string()
214 }
215
216 pub fn get_variant(&self) -> String {
217 self.device.variant.to_string()
218 }
219
220 pub fn enable_channel(&self, channel: PicoChannel, range: PicoRange, coupling: PicoCoupling) {
221 self.enabled_channels.write().insert(
222 channel,
223 ChannelConfig {
224 range,
225 coupling,
226 offset: 0.0,
227 },
228 );
229 }
230
231 pub fn disable_channel(&self, channel: PicoChannel) {
232 self.enabled_channels.write().remove(&channel);
233 }
234
235 pub fn get_channels(&self) -> Vec<PicoChannel> {
236 self.device.get_channels()
237 }
238
239 pub fn get_valid_ranges(&self, channel: PicoChannel) -> Option<Vec<PicoRange>> {
240 self.device.channel_ranges.get(&channel).cloned()
241 }
242
243 pub fn get_channel_config(&self, channel: PicoChannel) -> Option<ChannelConfig> {
244 self.enabled_channels.read().get(&channel).cloned()
245 }
246
247 #[tracing::instrument(level = "info")]
249 pub fn start(&self, requested_sample_rate: u32) -> PicoResult<u32> {
250 {
252 self.target_state.set(Target::Streaming {
253 requested_sample_rate,
254 });
255 }
256
257 let mut count = 0;
259 loop {
260 self.run_state()?;
261
262 let current = self.current_state.read();
263 if let State::Streaming {
264 actual_sample_rate, ..
265 } = *current
266 {
267 return Ok(actual_sample_rate);
268 }
269
270 count += 1;
271
272 if count > 20 {
273 return Err(PicoStatus::TIMEOUT.into());
274 }
275 }
276 }
277
278 #[tracing::instrument(level = "info")]
280 pub fn stop(&self) {
281 self.target_state.set(Target::Open);
282 }
283
284 #[tracing::instrument(level = "info")]
286 pub fn close(&self) {
287 self.target_state.set(Target::Closed);
288 }
289
290 fn start_background_thread(&mut self) {
291 let (tx_terminate, rx_terminate) = bounded::<()>(0);
292
293 let handle = thread::Builder::new()
294 .name("Streaming background task".to_string())
295 .spawn({
296 let device = self.clone();
297 let mut wait_for_closed = false;
298
299 move || loop {
300 let next_wait = device
301 .run_state()
302 .unwrap_or_else(|_| Duration::from_millis(500));
303
304 if !wait_for_closed && rx_terminate.recv_timeout(next_wait).is_ok() {
305 device.close();
306 wait_for_closed = true;
307 }
308
309 if wait_for_closed {
310 if let State::Closed = *device.current_state.read() {
311 return;
312 }
313 }
314 }
315 })
316 .expect("Could not start thread");
317
318 self.background_handle = Some(BackgroundThreadHandle::new(tx_terminate, handle));
319 }
320
321 fn run_state(&self) -> PicoResult<Duration> {
322 let mut current_state = self.current_state.write();
323 let initial_state = current_state.clone();
324
325 let target = self.target_state.get();
326
327 let (next_state, next_duration) = match current_state.clone() {
328 State::Closed => match target {
329 Target::Closed => (State::Closed, Duration::from_millis(500)),
330 Target::Open | Target::Streaming { .. } => {
331 let handle = self.device.driver.open_unit(Some(&self.device.serial))?;
332 (State::Open { handle }, Duration::from_millis(1))
333 }
334 },
335 State::Open { handle } => match target {
336 Target::Closed => {
337 self.device.driver.close(handle)?;
338 (State::Closed, Duration::from_millis(500))
339 }
340 Target::Open => self.ping(handle),
341 Target::Streaming {
342 requested_sample_rate,
343 } => self.configure_and_start(handle, requested_sample_rate)?,
344 },
345 State::Streaming {
346 handle,
347 actual_sample_rate,
348 buffers,
349 } => match target {
350 Target::Closed | Target::Open => {
351 self.device.driver.stop(handle)?;
352 (State::Open { handle }, Duration::from_millis(1))
353 }
354 Target::Streaming { .. } => self.stream(handle, buffers, actual_sample_rate),
355 },
356 };
357
358 if initial_state != next_state {
359 info!("State changed '{:?}' > '{:?}'", initial_state, next_state);
360 }
361
362 *current_state = next_state;
363
364 Ok(next_duration)
365 }
366
367 fn ping(&self, handle: i16) -> (State, Duration) {
368 if self.device.driver.ping_unit(handle).is_err() {
369 let _ = self.device.driver.stop(handle);
370 let _ = self.device.driver.close(handle);
371
372 (State::Closed, Duration::from_millis(500))
373 } else {
374 (State::Open { handle }, Duration::from_millis(500))
375 }
376 }
377
378 #[tracing::instrument(skip(self), level = "debug")]
379 fn configure_and_start(
380 &self,
381 handle: i16,
382 samples_per_second: u32,
383 ) -> PicoResult<(State, Duration)> {
384 let mut buffers = HashMap::new();
385
386 let enabled_channels = self.enabled_channels.read();
387
388 for (channel, ranges) in &self.device.channel_ranges {
389 if ranges.is_empty() {
391 continue;
392 }
393
394 if let Some(config) = enabled_channels.get(channel) {
396 let buffer_size = samples_per_second as usize;
397
398 self.device
399 .driver
400 .enable_channel(handle, *channel, &config)?;
401
402 let ch_buf = buffers
403 .entry(*channel)
404 .or_insert_with(|| Arc::new(RwLock::new(Pin::new(vec![0i16; buffer_size]))));
405
406 self.device.driver.set_data_buffer(
407 handle,
408 *channel,
409 ch_buf.clone(),
410 buffer_size,
411 )?;
412 } else {
413 self.device.driver.disable_channel(handle, *channel)?;
414 }
415 }
416
417 let target_config = SampleConfig::from_samples_per_second(samples_per_second);
418 let actual_sample_rate = self
419 .device
420 .driver
421 .start_streaming(handle, &target_config)
422 .map(|sc| sc.samples_per_second())?;
423
424 Ok((
425 State::Streaming {
426 handle,
427 actual_sample_rate,
428 buffers,
429 },
430 Duration::from_millis(100),
431 ))
432 }
433
434 #[tracing::instrument(skip(self, buffers), level = "trace")]
435 fn stream(
436 &self,
437 handle: i16,
438 buffers: BufferMap,
439 actual_sample_rate: u32,
440 ) -> (State, Duration) {
441 let callback = |start_index, sample_count| {
442 let channels = self.enabled_channels.read();
443
444 let channels = channels
445 .iter()
446 .map(|(ch, config)| {
447 let ch_buf = buffers
448 .get(&ch)
449 .expect("Channel is enabled but has no buffer")
450 .read();
451
452 (
453 *ch,
454 RawChannelDataBlock {
455 multiplier: config.range.get_max_scaled_value()
456 / self.device.max_adc_value as f64,
457 samples: ch_buf[start_index..(start_index + sample_count)].to_vec(),
458 },
459 )
460 })
461 .collect::<HashMap<_, _>>();
462
463 self.new_data.emit(StreamingEvent {
464 samples_per_second: actual_sample_rate,
465 length: sample_count,
466 channels,
467 });
468 };
469
470 let channels = buffers.keys().copied().collect::<Vec<_>>();
471
472 if let Err(error) =
473 self.device
474 .driver
475 .get_latest_streaming_values(handle, &channels, Box::new(callback))
476 {
477 if error.status == PicoStatus::WAITING_FOR_DATA_BUFFERS {
478 for (channel, buffer) in &buffers {
479 let len = { buffer.read().len() };
480 self.device
481 .driver
482 .set_data_buffer(handle, *channel, buffer.clone(), len)
483 .unwrap();
484 }
485
486 (
487 State::Streaming {
488 handle,
489 buffers,
490 actual_sample_rate,
491 },
492 Duration::from_millis(5),
493 )
494 } else {
495 warn!("Streaming stopped: '{:?}'", error);
496
497 let _ = self.device.driver.stop(handle);
498 let _ = self.device.driver.close(handle);
499
500 (State::Closed, Duration::from_millis(200))
501 }
502 } else {
503 (
504 State::Streaming {
505 handle,
506 actual_sample_rate,
507 buffers,
508 },
509 Duration::from_millis(50),
510 )
511 }
512 }
513}
514
515pub trait ToStreamDevice {
517 fn into_streaming_device(self) -> PicoStreamingDevice;
518}
519
520impl ToStreamDevice for PicoDevice {
521 fn into_streaming_device(self) -> PicoStreamingDevice {
522 PicoStreamingDevice::new(self)
523 }
524}
525
526pub struct BackgroundThreadHandle {
527 tx_terminate: Sender<()>,
528 handle: Option<JoinHandle<()>>,
529}
530
531impl BackgroundThreadHandle {
532 pub fn new(tx_terminate: Sender<()>, handle: JoinHandle<()>) -> Arc<Self> {
533 Arc::new(BackgroundThreadHandle {
534 tx_terminate,
535 handle: Some(handle),
536 })
537 }
538}
539
540impl Drop for BackgroundThreadHandle {
541 #[tracing::instrument(skip(self), level = "debug")]
542 fn drop(&mut self) {
543 self.tx_terminate.send(()).unwrap();
544
545 self.handle.take().unwrap().join().unwrap();
546 }
547}