ppk2/
lib.rs

1#![doc = include_str!("../README.md")]
2#![deny(missing_docs)]
3
4use measurement::{MeasurementAccumulator, MeasurementIterExt, MeasurementMatch};
5use serialport::{ClearBuffer::Input, FlowControl, SerialPort};
6use std::str::Utf8Error;
7use std::sync::mpsc::{self, Receiver, SendError, TryRecvError};
8use std::{
9    borrow::Cow,
10    collections::VecDeque,
11    io,
12    sync::{Arc, Condvar, Mutex},
13    thread,
14    time::Duration,
15};
16use thiserror::Error;
17use types::{DevicePower, LogicPortPins, MeasurementMode, Metadata, SourceVoltage};
18
19use crate::cmd::Command;
20
21pub mod cmd;
22pub mod measurement;
23pub mod types;
24
25const SPS_MAX: usize = 100_000;
26
27#[derive(Error, Debug)]
28/// PPK2 communication or data parsing error.
29#[allow(missing_docs)]
30pub enum Error {
31    #[error("Serial port error: {0}")]
32    SerialPort(#[from] serialport::Error),
33    #[error("PPK2 not found. Is the device connected and are permissions set correctly?")]
34    Ppk2NotFound,
35    #[error("IO error: {0}")]
36    Io(#[from] io::Error),
37    #[error("Utf8 error {0}")]
38    Utf8(#[from] Utf8Error),
39    #[error("Parse error in \"{0}\"")]
40    Parse(String),
41    #[error("Error sending measurement: {0}")]
42    SendMeasurement(#[from] SendError<MeasurementMatch>),
43    #[error("Error sending stop signal: {0}")]
44    SendStopSignal(#[from] SendError<()>),
45    #[error("Worker thread signal error: {0}")]
46    WorkerSignalError(#[from] TryRecvError),
47    #[error("Error deserializeing a measurement: {0:?}")]
48    DeserializeMeasurement(Vec<u8>),
49}
50
51#[allow(missing_docs)]
52pub type Result<T> = std::result::Result<T, Error>;
53
54/// PPK2 device representation.
55pub struct Ppk2 {
56    port: Box<dyn SerialPort>,
57    metadata: Metadata,
58}
59
60impl Ppk2 {
61    /// Create a new instance and configure the given [MeasurementMode].
62    pub fn new<'a>(path: impl Into<Cow<'a, str>>, mode: MeasurementMode) -> Result<Self> {
63        let mut port = serialport::new(path, 9600)
64            .timeout(Duration::from_millis(500))
65            .flow_control(FlowControl::Hardware)
66            .open()?;
67
68        if let Err(e) = port.clear(serialport::ClearBuffer::All) {
69            tracing::warn!("failed to clear buffers: {:?}", e);
70        }
71
72        // Required to work on Windows.
73        if let Err(e) = port.write_data_terminal_ready(true) {
74            tracing::warn!("failed to set DTR: {:?}", e);
75        }
76
77        let mut ppk2 = Self {
78            port,
79            metadata: Metadata::default(),
80        };
81
82        ppk2.metadata = ppk2.get_metadata()?;
83        ppk2.set_power_mode(mode)?;
84        Ok(ppk2)
85    }
86
87    /// Send a raw command and return the result.
88    pub fn send_command(&mut self, command: Command) -> Result<Vec<u8>> {
89        self.port.write_all(&Vec::from_iter(command.bytes()))?;
90        // Doesn't allocate if expected response length is 0
91        let mut response = Vec::with_capacity(command.expected_response_len());
92        let mut buf = [0u8; 128];
93        while !command.response_complete(&response) {
94            let n = self.port.read(&mut buf)?;
95            response.extend_from_slice(&buf[..n]);
96        }
97        Ok(response)
98    }
99
100    fn try_get_metadata(&mut self) -> Result<Metadata> {
101        let response = self.send_command(Command::GetMetaData)?;
102        Metadata::from_bytes(&response)
103    }
104
105    /// Get the device metadata.
106    pub fn get_metadata(&mut self) -> Result<Metadata> {
107        let mut result: Result<Metadata> = Err(Error::Parse("Metadata".to_string()));
108
109        // Retry a few times, as the metadata command sometimes fails
110        for _ in 0..3 {
111            match self.try_get_metadata() {
112                Ok(metadata) => {
113                    result = Ok(metadata);
114                    break;
115                }
116                Err(e) => {
117                    tracing::warn!("Error fetching metadata: {:?}. Retrying..", e);
118                }
119            }
120        }
121
122        result
123    }
124
125    /// Enable or disable the device power.
126    pub fn set_device_power(&mut self, power: DevicePower) -> Result<()> {
127        self.send_command(Command::DeviceRunningSet(power))?;
128        Ok(())
129    }
130
131    /// Set the voltage of the device voltage source.
132    pub fn set_source_voltage(&mut self, vdd: SourceVoltage) -> Result<()> {
133        self.send_command(Command::RegulatorSet(vdd))?;
134        Ok(())
135    }
136
137    /// Start measurements. Returns a tuple of:
138    /// - [Ppk2<Measuring>],
139    /// - [Receiver] of [measurement::MeasurementMatch], and
140    /// - A closure that can be called to stop the measurement parsing pipeline and return the
141    ///   device.
142    pub fn start_measurement(
143        self,
144        sps: usize,
145    ) -> Result<(Receiver<MeasurementMatch>, impl FnOnce() -> Result<Self>)> {
146        self.start_measurement_matching(LogicPortPins::default(), sps)
147    }
148
149    /// Start measurements. Returns a tuple of:
150    /// - [Ppk2<Measuring>],
151    /// - [Receiver] of [measurement::Result], and
152    /// - A closure that can be called to stop the measurement parsing pipeline and return the
153    ///   device.
154    pub fn start_measurement_matching(
155        mut self,
156        pins: LogicPortPins,
157        sps: usize,
158    ) -> Result<(Receiver<MeasurementMatch>, impl FnOnce() -> Result<Self>)> {
159        // Stuff needed to communicate with the main thread
160        // ready allows main thread to signal worker when serial input buf is cleared.
161        let ready = Arc::new((Mutex::new(false), Condvar::new()));
162        // This channel is for sending measurements to the main thread.
163        let (meas_tx, meas_rx) = mpsc::channel::<MeasurementMatch>();
164        // This channel allows the main thread to notify that the worker thread can stop
165        // parsing data.
166        let (sig_tx, sig_rx) = mpsc::channel::<()>();
167
168        let task_ready = ready.clone();
169        let mut port = self.port.try_clone()?;
170        let metadata = self.metadata.clone();
171
172        let t = thread::spawn(move || {
173            let r = || -> Result<()> {
174                // Create an accumulator with the current device metadata
175                let mut accumulator = MeasurementAccumulator::new(metadata);
176                // First wait for main thread to clear
177                // serial port input buffer
178                let (lock, cvar) = &*task_ready;
179                let _l = cvar
180                    .wait_while(lock.lock().unwrap(), |ready| !*ready)
181                    .unwrap();
182
183                /* 4 bytes is the size of a single sample, and the PPK pushes 100,000 samples per second.
184                   Having size of `buf` at eg.1024 blocks port.read() until the buffer is full with 1024 bytes (128 samples).
185                   The measurement returned will be the average of the 128 samples. But we want to get every single sample when
186                   requested sps is 100,000. Hence, we set the buffer size to 4 bytes, and read the port in a loop,
187                   feeding the accumulator with the data.
188                */
189                let mut buf = [0u8; 4];
190                let mut measurement_buf = VecDeque::with_capacity(SPS_MAX);
191                let mut missed = 0;
192                loop {
193                    // Check whether the main thread has signaled
194                    // us to stop
195                    match sig_rx.try_recv() {
196                        Ok(_) => return Ok(()),
197                        Err(TryRecvError::Empty) => {}
198                        Err(e) => return Err(e.into()),
199                    }
200
201                    // Now we read chunks and feed them to the accumulator
202                    let n = port.read(&mut buf)?;
203                    missed += accumulator.feed_into(&buf[..n], &mut measurement_buf);
204                    let len = measurement_buf.len();
205                    if len >= SPS_MAX / sps {
206                        let measurement = measurement_buf.drain(..).combine_matching(missed, pins);
207                        meas_tx.send(measurement)?;
208                        missed = 0;
209                    }
210                }
211            };
212            let res = r();
213            if let Err(e) = &res {
214                tracing::error!("Error fetching measurements: {:?}", e);
215            };
216            res
217        });
218        self.port.clear(Input)?;
219
220        let (lock, cvar) = &*ready;
221        let mut ready = lock.lock().unwrap();
222        *ready = true;
223        cvar.notify_all();
224
225        self.send_command(Command::AverageStart)?;
226
227        let stop = move || {
228            sig_tx.send(())?;
229            t.join().expect("Data receive thread panicked")?;
230            self.send_command(Command::AverageStop)?;
231            Ok(self)
232        };
233
234        Ok((meas_rx, stop))
235    }
236
237    /// Reset the device, making the device unusable.
238    pub fn reset(mut self) -> Result<()> {
239        self.send_command(Command::Reset)?;
240        Ok(())
241    }
242
243    fn set_power_mode(&mut self, mode: MeasurementMode) -> Result<()> {
244        self.send_command(Command::SetPowerMode(mode))?;
245        Ok(())
246    }
247}
248
249/// Try to find the serial port the PPK2 is connected to.
250pub fn try_find_ppk2_port() -> Result<String> {
251    use serialport::SerialPortType::UsbPort;
252
253    Ok(serialport::available_ports()?
254        .into_iter()
255        .find(|p| match &p.port_type {
256            UsbPort(usb) => usb.vid == 0x1915 && usb.pid == 0xc00a,
257            _ => false,
258        })
259        .ok_or(Error::Ppk2NotFound)?
260        .port_name)
261}