Skip to main content

ut325f_rs/
meter.rs

1use anyhow::Result;
2use anyhow::anyhow;
3use std::time::Duration;
4use tokio::io::AsyncReadExt;
5use tokio::time;
6use tokio_serial::{SerialPortBuilderExt, SerialStream};
7
8use crate::reading::Reading;
9
10pub struct Meter {
11    _sync_timeout: Duration,
12    port: String,
13    serial: Option<SerialStream>,
14}
15
16impl Meter {
17    pub fn new(port: String) -> Self {
18        Meter {
19            _sync_timeout: Duration::from_secs(5),
20            port,
21            serial: None,
22        }
23    }
24
25    pub async fn open(&mut self) -> Result<()> {
26        let builder = tokio_serial::new(&self.port, 115200)
27            .data_bits(tokio_serial::DataBits::Eight)
28            .parity(tokio_serial::Parity::None)
29            .stop_bits(tokio_serial::StopBits::One)
30            .flow_control(tokio_serial::FlowControl::None)
31            .timeout(Duration::from_secs(1));
32
33        match builder.open_native_async() {
34            Ok(port) => {
35                self.serial = Some(port);
36                self.clear_buffer().await?;
37                Ok(())
38            }
39            Err(e) => Err(anyhow!("Failed to open serial port '{}': {}", self.port, e)),
40        }
41    }
42
43    async fn clear_buffer(&mut self) -> Result<()> {
44        for _ in 0..3 {
45            match self.read().await {
46                Ok(_) => (),
47                Err(ref e) if e.to_string().contains("TimedOut") => {
48                    // Ignore timeouts during clearing
49                }
50                Err(e) => eprintln!("Warning: Initial read error: {}", e),
51            }
52            tokio::time::sleep(Duration::from_millis(100)).await;
53        }
54        Ok(())
55    }
56
57    pub async fn read(&mut self) -> Result<Reading> {
58        let mut serial = self
59            .serial
60            .as_mut()
61            .ok_or_else(|| anyhow!("Serial port is not open"))?;
62        let mut sync_buf = vec![0u8; Reading::N_SYNC_BYTES];
63        let mut rest_buf = vec![0u8; Reading::N_BYTES - Reading::N_SYNC_BYTES];
64
65        loop {
66            read_with_timeout(&mut serial, &mut sync_buf, self._sync_timeout).await?;
67            if sync_buf == Reading::SYNC {
68                break;
69            }
70        }
71        read_with_timeout(&mut serial, &mut rest_buf, self._sync_timeout).await?;
72
73        let mut combined = sync_buf;
74        combined.extend_from_slice(&rest_buf);
75        let reading_array: [u8; Reading::N_BYTES] = combined.try_into().map_err(|v: Vec<u8>| {
76            anyhow!(
77                "Error converting Vec<u8> to [u8; {}]: {:?}",
78                Reading::N_BYTES,
79                v
80            )
81        })?;
82
83        Reading::parse(&reading_array)
84    }
85
86    pub async fn close(&mut self) -> Result<()> {
87        self.serial.take();
88        Ok(())
89    }
90}
91
92async fn read_with_timeout<R>(mut reader: R, buf: &mut [u8], timeout: Duration) -> Result<()>
93where
94    R: tokio::io::AsyncRead + Unpin,
95{
96    match time::timeout(timeout, reader.read_exact(buf)).await {
97        Ok(Ok(_)) => Ok(()),
98        Ok(Err(e)) => Err(anyhow!("Error reading data: {}", e)),
99        Err(_) => Err(anyhow!("Timeout reading data")),
100    }
101}