1use anyhow::Result;
2use anyhow::anyhow;
3use std::time::Duration;
4use tokio::io::AsyncReadExt;
5use tokio::time;
6use tokio_serial::{SerialPortBuilderExt, SerialStream};
7
8use crate::reading::Reading;
9
10pub struct Meter {
11 _sync_timeout: Duration,
12 port: String,
13 serial: Option<SerialStream>,
14}
15
16impl Meter {
17 pub fn new(port: String) -> Self {
18 Meter {
19 _sync_timeout: Duration::from_secs(5),
20 port,
21 serial: None,
22 }
23 }
24
25 pub async fn open(&mut self) -> Result<()> {
26 let builder = tokio_serial::new(&self.port, 115200)
27 .data_bits(tokio_serial::DataBits::Eight)
28 .parity(tokio_serial::Parity::None)
29 .stop_bits(tokio_serial::StopBits::One)
30 .flow_control(tokio_serial::FlowControl::None)
31 .timeout(Duration::from_secs(1));
32
33 match builder.open_native_async() {
34 Ok(port) => {
35 self.serial = Some(port);
36 self.clear_buffer().await?;
37 Ok(())
38 }
39 Err(e) => Err(anyhow!("Failed to open serial port '{}': {}", self.port, e)),
40 }
41 }
42
43 async fn clear_buffer(&mut self) -> Result<()> {
44 for _ in 0..3 {
45 match self.read().await {
46 Ok(_) => (),
47 Err(ref e) if e.to_string().contains("TimedOut") => {
48 }
50 Err(e) => eprintln!("Warning: Initial read error: {}", e),
51 }
52 tokio::time::sleep(Duration::from_millis(100)).await;
53 }
54 Ok(())
55 }
56
57 pub async fn read(&mut self) -> Result<Reading> {
58 let mut serial = self
59 .serial
60 .as_mut()
61 .ok_or_else(|| anyhow!("Serial port is not open"))?;
62 let mut sync_buf = vec![0u8; Reading::N_SYNC_BYTES];
63 let mut rest_buf = vec![0u8; Reading::N_BYTES - Reading::N_SYNC_BYTES];
64
65 loop {
66 read_with_timeout(&mut serial, &mut sync_buf, self._sync_timeout).await?;
67 if sync_buf == Reading::SYNC {
68 break;
69 }
70 }
71 read_with_timeout(&mut serial, &mut rest_buf, self._sync_timeout).await?;
72
73 let mut combined = sync_buf;
74 combined.extend_from_slice(&rest_buf);
75 let reading_array: [u8; Reading::N_BYTES] = combined.try_into().map_err(|v: Vec<u8>| {
76 anyhow!(
77 "Error converting Vec<u8> to [u8; {}]: {:?}",
78 Reading::N_BYTES,
79 v
80 )
81 })?;
82
83 Reading::parse(&reading_array)
84 }
85
86 pub async fn close(&mut self) -> Result<()> {
87 self.serial.take();
88 Ok(())
89 }
90}
91
92async fn read_with_timeout<R>(mut reader: R, buf: &mut [u8], timeout: Duration) -> Result<()>
93where
94 R: tokio::io::AsyncRead + Unpin,
95{
96 match time::timeout(timeout, reader.read_exact(buf)).await {
97 Ok(Ok(_)) => Ok(()),
98 Ok(Err(e)) => Err(anyhow!("Error reading data: {}", e)),
99 Err(_) => Err(anyhow!("Timeout reading data")),
100 }
101}