modbus_rtu/master/
asynced.rs

1//! Async Modbus RTU master backed by the `tokio-serial` crate.
2use crate::{Request, Response};
3use tokio::io::{AsyncReadExt, AsyncWriteExt};
4use tokio_serial::SerialPortBuilderExt;
5use serialport::SerialPort;
6
7
8/// Async Modbus RTU master that honors Modbus idle timing rules between frames.
9#[derive(Debug)]
10pub struct AsyncMaster {
11    /// Serial port handle used for request/response traffic.
12    port: tokio_serial::SerialStream,
13
14    /// Timestamp of the last transmitted frame, used to honor the 3.5-char gap.
15    last_tx: tokio::time::Instant,
16
17    /// Cached baud rate so higher-level code can inspect the active speed.
18    baud_rate: u32,
19}
20
21
22impl AsyncMaster {
23    /// Builds a master configured for an RS-485 style setup (8N1, async I/O).
24    ///
25    /// The port timeout is pinned to the Modbus RTU silent interval (T3.5) for
26    /// the supplied baud rate so that the reader can detect frame boundaries.
27    pub fn new_rs485(path: &str, baud_rate: u32) -> serialport::Result<Self> {
28        let port = tokio_serial::new(path, baud_rate)
29            .data_bits(tokio_serial::DataBits::Eight)
30            .parity(tokio_serial::Parity::None)
31            .stop_bits(tokio_serial::StopBits::One)
32            .timeout(Self::idle_time_rs485(baud_rate))
33            .open_native_async()?;
34        Ok(Self {
35            port,
36            last_tx: tokio::time::Instant::now() - Self::idle_time_rs485(baud_rate),
37            baud_rate,
38        })
39    }
40
41    /// Returns the baud rate currently configured on the serial link.
42    pub fn baud_rate(&self) -> u32 {
43        self.baud_rate
44    }
45
46    /// Updates the serial baud rate and matching Modbus idle timeout.
47    pub fn set_baudrate(&mut self, baud_rate: u32) -> serialport::Result<()> {
48        self.port.set_baud_rate(baud_rate)?;
49        self.port.set_timeout(Self::idle_time_rs485(baud_rate))?;
50        self.baud_rate = baud_rate;
51        self.last_tx = tokio::time::Instant::now();
52        Ok(())
53    }
54
55    /// Sends a Modbus RTU request and waits for the corresponding response.
56    ///
57    /// Broadcast requests return immediately after the frame is flushed because
58    /// the Modbus RTU spec forbids responses to slave id 0.
59    pub async fn send(&mut self, req: &Request<'_>) -> Result<Response, crate::error::Error> {
60        self.wait_for_idle_gap().await;
61        let frame = req
62            .to_bytes()
63            .map_err(|e| crate::error::Error::Request(e))?;
64        self.port
65            .clear(serialport::ClearBuffer::Output)
66            .map_err(|e| crate::error::Error::IO(e.into()))?;
67        self.write(&frame).await?;
68        if req.is_broadcasting() {
69            return Ok(Response::Success);
70        }
71        let post_tx_idle = Self::idle_time_rs485(self.baud_rate);
72        tokio::time::sleep(post_tx_idle).await;
73        let mut buf: [u8; 256] = [0; 256];
74        let len = self
75            .read(&mut buf, req.timeout(), req.function().expected_len())
76            .await?;
77        if len == 0 {
78            return Err(crate::error::Error::IO(
79                std::io::ErrorKind::TimedOut.into(),
80            ));
81        }
82        Response::from_bytes(req, &buf[0..len]).map_err(|e| crate::error::Error::Response(e))
83    }
84
85    /// Waits for the Modbus silent interval to elapse before transmitting.
86    async fn wait_for_idle_gap(&self) {
87        let idle_until = self.last_tx + Self::idle_time_rs485(self.baud_rate);
88        let now = tokio::time::Instant::now();
89        if idle_until > now {
90            tokio::time::sleep_until(idle_until).await;
91        }
92    }
93
94    /// Writes a Modbus frame to the serial port and records the transmit instant.
95    async fn write(&mut self, frame: &[u8]) -> Result<(), crate::error::Error> {
96        self.port
97            .write_all(frame)
98            .await
99            .map_err(|e| crate::error::Error::IO(e))?;
100        self.port
101            .flush()
102            .await
103            .map_err(|e| crate::error::Error::IO(e))?;
104        self.last_tx = tokio::time::Instant::now();
105        Ok(())
106    }
107
108    /// Reads bytes until the slave stops responding or `buf` fills up.
109    async fn read(
110        &mut self,
111        buf: &mut [u8],
112        timeout: core::time::Duration,
113        expected_len: usize,
114    ) -> Result<usize, crate::error::Error> {
115        let mut len: usize = 0;
116        let deadline = tokio::time::Instant::now() + timeout;
117        loop {
118            let now = tokio::time::Instant::now();
119            if now >= deadline {
120                break;
121            }
122            let remaining = deadline.saturating_duration_since(now);
123            let read_res =
124                tokio::time::timeout(remaining, self.port.read(&mut buf[len..])).await;
125            let n = match read_res {
126                Ok(Ok(n)) => n,
127                Ok(Err(ref e)) if e.kind() == std::io::ErrorKind::TimedOut => {
128                    if len == 0 {
129                        continue;
130                    }
131                    if len >= 5 && buf[1] & 0x80 != 0 {
132                        break;
133                    }
134                    if len < expected_len {
135                        continue;
136                    }
137                    break
138                }
139                Ok(Err(e)) => return Err(crate::error::Error::IO(e)),
140                Err(_) => break,
141            };
142            len += n;
143            if len >= buf.len() {
144                break;
145            }
146        }
147        Ok(len)
148    }
149
150    /// Computes the Modbus RTU T3.5 idle time for a link running 8N1 encoding.
151    fn idle_time_rs485(baud_rate: u32) -> core::time::Duration {
152        const BITS_PER_CHAR: f64 = 10.0;
153        let seconds = 3.5 * BITS_PER_CHAR / baud_rate as f64;
154        core::time::Duration::from_secs_f64(seconds)
155    }
156}