pico_streaming/
lib.rs

1#![forbid(unsafe_code)]
2
3//! Streams gap-less data from Pico Technology oscilloscope drivers.
4//!
5//! This is a sub crate that you probably don't want to use directly. Try the top level
6//! [`pico-sdk`](https://crates.io/crates/pico-sdk) crate which exposes everything from here.
7//!
8//! Once streaming is started, a `PicoStreamingDevice` returns `StreamingEvent`s. The possible events
9//! and `Connected`, `Disconnected` and `Data`. The `Data` event contains raw `Vec<i16>` samples for
10//! each enabled channel that can easily be scaled to the channel units (ie. Volts, Amps, etc).
11//!
12//!
13//! # Example
14//! ```no_run
15//! # fn run() -> Result<(),Box<dyn std::error::Error>> {
16//! # use std::sync::Arc;
17//! # use pico_common::{Driver, PicoChannel, PicoCoupling, PicoRange};
18//! # use pico_driver::LoadDriverExt;
19//! # use pico_device::PicoDevice;
20//! # use pico_streaming::{NewDataHandler, StreamingEvent, ToStreamDevice};
21//! # // Load the required driver
22//! # let driver = Driver::PS2000.try_load()?;
23//! # // Try and load the first available ps2000 device
24//! # let device = PicoDevice::try_open(&driver, None)?;
25//! // Get a streaming device from a PicoDevice
26//! let stream_device = device.into_streaming_device();
27//!
28//! // Enable and configure 2 channels
29//! stream_device.enable_channel(PicoChannel::A, PicoRange::X1_PROBE_2V, PicoCoupling::DC);
30//! stream_device.enable_channel(PicoChannel::B, PicoRange::X1_PROBE_1V, PicoCoupling::AC);
31//!
32//! struct StdoutHandler;
33//!
34//! impl NewDataHandler for StdoutHandler {
35//!     fn handle_event(&self, event: &StreamingEvent) {
36//!         println!("Sample count: {}", event.length);
37//!     }
38//! }
39//!
40//! let handler = Arc::new(StdoutHandler);
41//!
42//! // Subscribe to streaming events
43//! stream_device.new_data.subscribe(handler);
44//!
45//! // Start streaming with a sample rate of 1k
46//! stream_device.start(1_000)?;
47//! # Ok(())
48//! # }
49//! ```
50
51use crossbeam::channel::{bounded, Sender};
52use events::StreamingEvents;
53pub use events::{NewDataHandler, RawChannelDataBlock, StreamingEvent};
54use parking_lot::RwLock;
55use pico_common::{
56    ChannelConfig, PicoChannel, PicoCoupling, PicoRange, PicoResult, PicoStatus, SampleConfig,
57};
58use pico_device::PicoDevice;
59use std::{
60    collections::HashMap, fmt, pin::Pin, sync::Arc, thread, thread::JoinHandle, time::Duration,
61};
62use tracing::*;
63
64mod events;
65
66#[cfg_attr(feature = "serde", derive(serde::Serialize))]
67#[derive(Debug, Clone, Copy)]
68enum Target {
69    Closed,
70    Open,
71    Streaming { requested_sample_rate: u32 },
72}
73
74#[cfg_attr(feature = "serde", derive(serde::Serialize))]
75#[derive(Clone)]
76struct LockedTarget(Arc<RwLock<Target>>);
77
78impl LockedTarget {
79    pub fn new(target: Target) -> Self {
80        LockedTarget(Arc::new(RwLock::new(target)))
81    }
82
83    pub fn set(&self, new: Target) {
84        *self.0.write() = new;
85    }
86
87    pub fn get(&self) -> Target {
88        *self.0.read()
89    }
90}
91
92impl fmt::Debug for LockedTarget {
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94        f.write_fmt(format_args!("{:?}", self.0.try_read()))
95    }
96}
97
98type BufferMap = HashMap<PicoChannel, Arc<RwLock<Pin<Vec<i16>>>>>;
99
100#[cfg_attr(feature = "serde", derive(serde::Serialize))]
101#[derive(Clone)]
102enum State {
103    Closed,
104    Open {
105        handle: i16,
106    },
107    Streaming {
108        handle: i16,
109        actual_sample_rate: u32,
110        #[cfg_attr(feature = "serde", serde(skip))]
111        buffers: BufferMap,
112    },
113}
114
115impl PartialEq for State {
116    fn eq(&self, other: &Self) -> bool {
117        matches!(
118            (self, other),
119            (State::Closed, State::Closed)
120                | (State::Open { .. }, State::Open { .. })
121                | (State::Streaming { .. }, State::Streaming { .. })
122        )
123    }
124}
125
126impl fmt::Debug for State {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        match self {
129            State::Closed => f.debug_struct("Closed").finish(),
130            State::Open { handle } => f.debug_struct("Open").field("handle", handle).finish(),
131            State::Streaming {
132                handle,
133                actual_sample_rate,
134                ..
135            } => f
136                .debug_struct("Streaming")
137                .field("handle", handle)
138                .field("actual_sample_rate", actual_sample_rate)
139                .finish(),
140        }
141    }
142}
143
144/// Encapsulates a `PicoDevice` and adds streaming functionality
145///
146/// Automatically reconfigures and restarts streaming if the device connection
147/// is lost.
148#[cfg_attr(feature = "serde", derive(serde::Serialize))]
149#[derive(Clone)]
150pub struct PicoStreamingDevice {
151    device: PicoDevice,
152    target_state: LockedTarget,
153    current_state: Arc<RwLock<State>>,
154    enabled_channels: Arc<RwLock<HashMap<PicoChannel, ChannelConfig>>>,
155    #[cfg_attr(feature = "serde", serde(skip))]
156    background_handle: Option<Arc<BackgroundThreadHandle>>,
157    #[cfg_attr(feature = "serde", serde(skip))]
158    pub new_data: StreamingEvents,
159}
160
161impl fmt::Debug for PicoStreamingDevice {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        f.debug_struct("PicoStreamingDevice")
164            .field("device", &self.device)
165            .field("target_state", &self.target_state)
166            .field("current_state", &self.current_state.try_read())
167            .finish()
168    }
169}
170
171impl PartialEq for PicoStreamingDevice {
172    fn eq(&self, other: &Self) -> bool {
173        self.get_serial() == other.get_serial()
174    }
175}
176
177impl Eq for PicoStreamingDevice {}
178
179impl std::hash::Hash for PicoStreamingDevice {
180    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
181        self.get_serial().hash(state);
182    }
183}
184
185impl From<PicoDevice> for PicoStreamingDevice {
186    fn from(d: PicoDevice) -> Self {
187        PicoStreamingDevice::new(d)
188    }
189}
190
191impl PicoStreamingDevice {
192    fn new(device: PicoDevice) -> Self {
193        let (current_state, target_state) = match device.handle.lock().take() {
194            Some(handle) => (State::Open { handle }, Target::Open),
195            None => (State::Closed, Target::Closed),
196        };
197
198        let mut device = PicoStreamingDevice {
199            device,
200            target_state: LockedTarget::new(target_state),
201            current_state: Arc::new(RwLock::new(current_state)),
202            new_data: Default::default(),
203            enabled_channels: Default::default(),
204            background_handle: Default::default(),
205        };
206
207        device.start_background_thread();
208
209        device
210    }
211
212    pub fn get_serial(&self) -> String {
213        self.device.serial.to_string()
214    }
215
216    pub fn get_variant(&self) -> String {
217        self.device.variant.to_string()
218    }
219
220    pub fn enable_channel(&self, channel: PicoChannel, range: PicoRange, coupling: PicoCoupling) {
221        self.enabled_channels.write().insert(
222            channel,
223            ChannelConfig {
224                range,
225                coupling,
226                offset: 0.0,
227            },
228        );
229    }
230
231    pub fn disable_channel(&self, channel: PicoChannel) {
232        self.enabled_channels.write().remove(&channel);
233    }
234
235    pub fn get_channels(&self) -> Vec<PicoChannel> {
236        self.device.get_channels()
237    }
238
239    pub fn get_valid_ranges(&self, channel: PicoChannel) -> Option<Vec<PicoRange>> {
240        self.device.channel_ranges.get(&channel).cloned()
241    }
242
243    pub fn get_channel_config(&self, channel: PicoChannel) -> Option<ChannelConfig> {
244        self.enabled_channels.read().get(&channel).cloned()
245    }
246
247    /// Start streaming
248    #[tracing::instrument(level = "info")]
249    pub fn start(&self, requested_sample_rate: u32) -> PicoResult<u32> {
250        // Set the target state
251        {
252            self.target_state.set(Target::Streaming {
253                requested_sample_rate,
254            });
255        }
256
257        // Drive the state until we get the correct state or an error we can return
258        let mut count = 0;
259        loop {
260            self.run_state()?;
261
262            let current = self.current_state.read();
263            if let State::Streaming {
264                actual_sample_rate, ..
265            } = *current
266            {
267                return Ok(actual_sample_rate);
268            }
269
270            count += 1;
271
272            if count > 20 {
273                return Err(PicoStatus::TIMEOUT.into());
274            }
275        }
276    }
277
278    /// Stop streaming
279    #[tracing::instrument(level = "info")]
280    pub fn stop(&self) {
281        self.target_state.set(Target::Open);
282    }
283
284    /// Close device
285    #[tracing::instrument(level = "info")]
286    pub fn close(&self) {
287        self.target_state.set(Target::Closed);
288    }
289
290    fn start_background_thread(&mut self) {
291        let (tx_terminate, rx_terminate) = bounded::<()>(0);
292
293        let handle = thread::Builder::new()
294            .name("Streaming background task".to_string())
295            .spawn({
296                let device = self.clone();
297                let mut wait_for_closed = false;
298
299                move || loop {
300                    let next_wait = device
301                        .run_state()
302                        .unwrap_or_else(|_| Duration::from_millis(500));
303
304                    if !wait_for_closed && rx_terminate.recv_timeout(next_wait).is_ok() {
305                        device.close();
306                        wait_for_closed = true;
307                    }
308
309                    if wait_for_closed {
310                        if let State::Closed = *device.current_state.read() {
311                            return;
312                        }
313                    }
314                }
315            })
316            .expect("Could not start thread");
317
318        self.background_handle = Some(BackgroundThreadHandle::new(tx_terminate, handle));
319    }
320
321    fn run_state(&self) -> PicoResult<Duration> {
322        let mut current_state = self.current_state.write();
323        let initial_state = current_state.clone();
324
325        let target = self.target_state.get();
326
327        let (next_state, next_duration) = match current_state.clone() {
328            State::Closed => match target {
329                Target::Closed => (State::Closed, Duration::from_millis(500)),
330                Target::Open | Target::Streaming { .. } => {
331                    let handle = self.device.driver.open_unit(Some(&self.device.serial))?;
332                    (State::Open { handle }, Duration::from_millis(1))
333                }
334            },
335            State::Open { handle } => match target {
336                Target::Closed => {
337                    self.device.driver.close(handle)?;
338                    (State::Closed, Duration::from_millis(500))
339                }
340                Target::Open => self.ping(handle),
341                Target::Streaming {
342                    requested_sample_rate,
343                } => self.configure_and_start(handle, requested_sample_rate)?,
344            },
345            State::Streaming {
346                handle,
347                actual_sample_rate,
348                buffers,
349            } => match target {
350                Target::Closed | Target::Open => {
351                    self.device.driver.stop(handle)?;
352                    (State::Open { handle }, Duration::from_millis(1))
353                }
354                Target::Streaming { .. } => self.stream(handle, buffers, actual_sample_rate),
355            },
356        };
357
358        if initial_state != next_state {
359            info!("State changed '{:?}' > '{:?}'", initial_state, next_state);
360        }
361
362        *current_state = next_state;
363
364        Ok(next_duration)
365    }
366
367    fn ping(&self, handle: i16) -> (State, Duration) {
368        if self.device.driver.ping_unit(handle).is_err() {
369            let _ = self.device.driver.stop(handle);
370            let _ = self.device.driver.close(handle);
371
372            (State::Closed, Duration::from_millis(500))
373        } else {
374            (State::Open { handle }, Duration::from_millis(500))
375        }
376    }
377
378    #[tracing::instrument(skip(self), level = "debug")]
379    fn configure_and_start(
380        &self,
381        handle: i16,
382        samples_per_second: u32,
383    ) -> PicoResult<(State, Duration)> {
384        let mut buffers = HashMap::new();
385
386        let enabled_channels = self.enabled_channels.read();
387
388        for (channel, ranges) in &self.device.channel_ranges {
389            // If there are no valid ranges, skip configuring this channel
390            if ranges.is_empty() {
391                continue;
392            }
393
394            // is this channel enabled?
395            if let Some(config) = enabled_channels.get(channel) {
396                let buffer_size = samples_per_second as usize;
397
398                self.device
399                    .driver
400                    .enable_channel(handle, *channel, &config)?;
401
402                let ch_buf = buffers
403                    .entry(*channel)
404                    .or_insert_with(|| Arc::new(RwLock::new(Pin::new(vec![0i16; buffer_size]))));
405
406                self.device.driver.set_data_buffer(
407                    handle,
408                    *channel,
409                    ch_buf.clone(),
410                    buffer_size,
411                )?;
412            } else {
413                self.device.driver.disable_channel(handle, *channel)?;
414            }
415        }
416
417        let target_config = SampleConfig::from_samples_per_second(samples_per_second);
418        let actual_sample_rate = self
419            .device
420            .driver
421            .start_streaming(handle, &target_config)
422            .map(|sc| sc.samples_per_second())?;
423
424        Ok((
425            State::Streaming {
426                handle,
427                actual_sample_rate,
428                buffers,
429            },
430            Duration::from_millis(100),
431        ))
432    }
433
434    #[tracing::instrument(skip(self, buffers), level = "trace")]
435    fn stream(
436        &self,
437        handle: i16,
438        buffers: BufferMap,
439        actual_sample_rate: u32,
440    ) -> (State, Duration) {
441        let callback = |start_index, sample_count| {
442            let channels = self.enabled_channels.read();
443
444            let channels = channels
445                .iter()
446                .map(|(ch, config)| {
447                    let ch_buf = buffers
448                        .get(&ch)
449                        .expect("Channel is enabled but has no buffer")
450                        .read();
451
452                    (
453                        *ch,
454                        RawChannelDataBlock {
455                            multiplier: config.range.get_max_scaled_value()
456                                / self.device.max_adc_value as f64,
457                            samples: ch_buf[start_index..(start_index + sample_count)].to_vec(),
458                        },
459                    )
460                })
461                .collect::<HashMap<_, _>>();
462
463            self.new_data.emit(StreamingEvent {
464                samples_per_second: actual_sample_rate,
465                length: sample_count,
466                channels,
467            });
468        };
469
470        let channels = buffers.keys().copied().collect::<Vec<_>>();
471
472        if let Err(error) =
473            self.device
474                .driver
475                .get_latest_streaming_values(handle, &channels, Box::new(callback))
476        {
477            if error.status == PicoStatus::WAITING_FOR_DATA_BUFFERS {
478                for (channel, buffer) in &buffers {
479                    let len = { buffer.read().len() };
480                    self.device
481                        .driver
482                        .set_data_buffer(handle, *channel, buffer.clone(), len)
483                        .unwrap();
484                }
485
486                (
487                    State::Streaming {
488                        handle,
489                        buffers,
490                        actual_sample_rate,
491                    },
492                    Duration::from_millis(5),
493                )
494            } else {
495                warn!("Streaming stopped: '{:?}'", error);
496
497                let _ = self.device.driver.stop(handle);
498                let _ = self.device.driver.close(handle);
499
500                (State::Closed, Duration::from_millis(200))
501            }
502        } else {
503            (
504                State::Streaming {
505                    handle,
506                    actual_sample_rate,
507                    buffers,
508                },
509                Duration::from_millis(50),
510            )
511        }
512    }
513}
514
515/// Converts `PicoDevice` into `PicoStreamingDevice`
516pub trait ToStreamDevice {
517    fn into_streaming_device(self) -> PicoStreamingDevice;
518}
519
520impl ToStreamDevice for PicoDevice {
521    fn into_streaming_device(self) -> PicoStreamingDevice {
522        PicoStreamingDevice::new(self)
523    }
524}
525
526pub struct BackgroundThreadHandle {
527    tx_terminate: Sender<()>,
528    handle: Option<JoinHandle<()>>,
529}
530
531impl BackgroundThreadHandle {
532    pub fn new(tx_terminate: Sender<()>, handle: JoinHandle<()>) -> Arc<Self> {
533        Arc::new(BackgroundThreadHandle {
534            tx_terminate,
535            handle: Some(handle),
536        })
537    }
538}
539
540impl Drop for BackgroundThreadHandle {
541    #[tracing::instrument(skip(self), level = "debug")]
542    fn drop(&mut self) {
543        self.tx_terminate.send(()).unwrap();
544
545        self.handle.take().unwrap().join().unwrap();
546    }
547}