modbus_rtu/master/
asynced.rs1use crate::{Request, Response};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio_serial::SerialPortBuilderExt;
5use serialport::SerialPort;
6
7
8#[derive(Debug)]
10pub struct AsyncMaster {
11 port: tokio_serial::SerialStream,
13
14 last_tx: tokio::time::Instant,
16
17 baud_rate: u32,
19}
20
21
22impl AsyncMaster {
23 pub fn new_rs485(path: &str, baud_rate: u32) -> serialport::Result<Self> {
28 let port = tokio_serial::new(path, baud_rate)
29 .data_bits(tokio_serial::DataBits::Eight)
30 .parity(tokio_serial::Parity::None)
31 .stop_bits(tokio_serial::StopBits::One)
32 .timeout(Self::idle_time_rs485(baud_rate))
33 .open_native_async()?;
34 Ok(Self {
35 port,
36 last_tx: tokio::time::Instant::now() - Self::idle_time_rs485(baud_rate),
37 baud_rate,
38 })
39 }
40
41 pub fn baud_rate(&self) -> u32 {
43 self.baud_rate
44 }
45
46 pub fn set_baudrate(&mut self, baud_rate: u32) -> serialport::Result<()> {
48 self.port.set_baud_rate(baud_rate)?;
49 self.port.set_timeout(Self::idle_time_rs485(baud_rate))?;
50 self.baud_rate = baud_rate;
51 self.last_tx = tokio::time::Instant::now();
52 Ok(())
53 }
54
55 pub async fn send(&mut self, req: &Request<'_>) -> Result<Response, crate::error::Error> {
60 self.wait_for_idle_gap().await;
61 let frame = req
62 .to_bytes()
63 .map_err(|e| crate::error::Error::Request(e))?;
64 self.port
65 .clear(serialport::ClearBuffer::Output)
66 .map_err(|e| crate::error::Error::IO(e.into()))?;
67 self.write(&frame).await?;
68 if req.is_broadcasting() {
69 return Ok(Response::Success);
70 }
71 let post_tx_idle = Self::idle_time_rs485(self.baud_rate);
72 tokio::time::sleep(post_tx_idle).await;
73 let mut buf: [u8; 256] = [0; 256];
74 let len = self
75 .read(&mut buf, req.timeout(), req.function().expected_len())
76 .await?;
77 if len == 0 {
78 return Err(crate::error::Error::IO(
79 std::io::ErrorKind::TimedOut.into(),
80 ));
81 }
82 Response::from_bytes(req, &buf[0..len]).map_err(|e| crate::error::Error::Response(e))
83 }
84
85 async fn wait_for_idle_gap(&self) {
87 let idle_until = self.last_tx + Self::idle_time_rs485(self.baud_rate);
88 let now = tokio::time::Instant::now();
89 if idle_until > now {
90 tokio::time::sleep_until(idle_until).await;
91 }
92 }
93
94 async fn write(&mut self, frame: &[u8]) -> Result<(), crate::error::Error> {
96 self.port
97 .write_all(frame)
98 .await
99 .map_err(|e| crate::error::Error::IO(e))?;
100 self.port
101 .flush()
102 .await
103 .map_err(|e| crate::error::Error::IO(e))?;
104 self.last_tx = tokio::time::Instant::now();
105 Ok(())
106 }
107
108 async fn read(
110 &mut self,
111 buf: &mut [u8],
112 timeout: core::time::Duration,
113 expected_len: usize,
114 ) -> Result<usize, crate::error::Error> {
115 let mut len: usize = 0;
116 let deadline = tokio::time::Instant::now() + timeout;
117 loop {
118 let now = tokio::time::Instant::now();
119 if now >= deadline {
120 break;
121 }
122 let remaining = deadline.saturating_duration_since(now);
123 let read_res =
124 tokio::time::timeout(remaining, self.port.read(&mut buf[len..])).await;
125 let n = match read_res {
126 Ok(Ok(n)) => n,
127 Ok(Err(ref e)) if e.kind() == std::io::ErrorKind::TimedOut => {
128 if len == 0 {
129 continue;
130 }
131 if len >= 5 && buf[1] & 0x80 != 0 {
132 break;
133 }
134 if len < expected_len {
135 continue;
136 }
137 break
138 }
139 Ok(Err(e)) => return Err(crate::error::Error::IO(e)),
140 Err(_) => break,
141 };
142 len += n;
143 if len >= buf.len() {
144 break;
145 }
146 }
147 Ok(len)
148 }
149
150 fn idle_time_rs485(baud_rate: u32) -> core::time::Duration {
152 const BITS_PER_CHAR: f64 = 10.0;
153 let seconds = 3.5 * BITS_PER_CHAR / baud_rate as f64;
154 core::time::Duration::from_secs_f64(seconds)
155 }
156}