modbus_rtu/master/
queued.rs

1//! Async Modbus RTU master that queues requests and drives a single worker task.
2//! Useful when multiple async callers need serialized access to one serial link.
3use std::sync::Arc;
4use tokio::{io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, task::JoinHandle, time::Instant};
5use tokio_serial::SerialPortBuilderExt;
6use crate::{Function, Request, Response};
7use serialport::SerialPort;
8
9
10/// Multi-producer async master backed by a single worker task.
11pub struct QueuedMaster {
12    handle: JoinHandle<()>,
13    sender: mpsc::Sender<Job>,
14}
15
16/// Packet of work sent into the worker loop.
17struct Job {
18    request: OwnedRequest,
19    baud_rate: u32,
20    respond_to: oneshot::Sender<Result<Response, crate::error::Error>>,
21}
22
23/// Owned copy of a request so the worker outlives the caller borrow.
24struct OwnedRequest {
25    modbus_id: u8,
26    function: Function,
27    timeout: core::time::Duration,
28}
29
30impl OwnedRequest {
31    /// Clone the borrowed request into an owned form.
32    fn from_borrowed(req: &Request<'_>) -> Self {
33        Self {
34            modbus_id: req.modbus_id(),
35            function: req.function().clone(),
36            timeout: req.timeout(),
37        }
38    }
39
40    /// Rebuild a borrowed request view from the owned pieces.
41    fn as_request(&self) -> Request<'_> {
42        Request::new(self.modbus_id, &self.function, self.timeout)
43    }
44}
45
46impl QueuedMaster {
47    /// Build a queued master that spawns a worker task for one serial port.
48    ///
49    /// `buffer` is the queue depth of the internal MPSC channel; requests are
50    /// buffered up to this limit. Passing 0 will panic in
51    /// `tokio::sync::mpsc::channel`.
52    /// 
53    pub async fn new_rs485(path: &str, baud_rate: u32, buffer: usize) -> tokio_serial::Result<Arc<Self>> {
54        let port = tokio_serial::new(path, baud_rate)
55            .data_bits(tokio_serial::DataBits::Eight)
56            .parity(tokio_serial::Parity::None)
57            .stop_bits(tokio_serial::StopBits::One)
58            .timeout(Self::idle_time_rs485(baud_rate))
59            .open_native_async()?;
60        let (sender, receiver) = mpsc::channel::<Job>(buffer);
61        let handle = tokio::task::spawn(Self::task(port, baud_rate, receiver));
62        Ok(Arc::new(Self {
63            handle,
64            sender,
65        }))
66    }
67
68    /// Enqueue a request and wait for its response (or a broadcast ack).
69    pub async fn send(&self, req: &Request<'_>, baud_rate: u32) -> Result<Response, crate::error::Error> {
70        let (sender, receiver) = oneshot::channel::<Result<Response, crate::error::Error>>();
71        let job = Job {
72            request: OwnedRequest::from_borrowed(req),
73            baud_rate,
74            respond_to: sender,
75        };
76        self.sender
77            .send(job)
78            .await
79            .map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?;
80        receiver
81            .await
82            .map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?
83    }
84
85    /// Worker loop that serializes access to the serial port.
86    async fn task(mut port: tokio_serial::SerialStream, baud_rate: u32, mut receiver: mpsc::Receiver<Job>) {
87        let mut baud_rate = baud_rate;
88        let mut last_tx: Instant = Instant::now();
89        while let Some(Job { request, baud_rate: req_baud_rate, respond_to }) = receiver.recv().await {
90            let req = request.as_request();
91            if req_baud_rate != baud_rate {
92                if let Err(e) = port.set_baud_rate(req_baud_rate) {
93                    let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
94                    continue;
95                }
96                baud_rate = req_baud_rate;
97            }
98
99            let idle_until = last_tx + Self::idle_time_rs485(baud_rate);
100            let now = tokio::time::Instant::now();
101            if idle_until > now {
102                tokio::time::sleep_until(idle_until).await;
103            }
104
105            let frame = match req.to_bytes() {
106                Ok(frame) => frame,
107                Err(e) => {
108                    let _ = respond_to.send(Err(crate::error::Error::Request(e)));
109                    continue;
110                },
111            };
112
113            if let Err(e) = port.clear(serialport::ClearBuffer::Output) {
114                let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
115                continue;
116            }
117            if let Err(e) = Self::write(&mut port, &frame).await {
118                let _ = respond_to.send(Err(e));
119                continue;
120            }
121            last_tx = tokio::time::Instant::now();
122
123            if req.is_broadcasting() {
124                let _ = respond_to.send(Ok(Response::Success));
125                continue;
126            }
127
128            let post_tx_idle = Self::idle_time_rs485(baud_rate);
129            tokio::time::sleep(post_tx_idle).await;
130            let mut buf: [u8; 256] = [0; 256];
131            let len = match Self::read(&mut port, &mut buf, req.timeout(), req.function().expected_len()).await {
132                Ok(len) => len,
133                Err(e) => {
134                    let _ = respond_to.send(Err(e));
135                    continue;
136                },
137            };
138            if len == 0 {
139                let _ = respond_to.send(Err(crate::error::Error::IO(std::io::ErrorKind::TimedOut.into())));
140                continue;
141            }
142            let res = Response::from_bytes(&req, &buf[0..len]).map_err(|e| crate::error::Error::Response(e));
143            let _ = respond_to.send(res);
144        }
145    }
146
147    /// Writes a Modbus frame to the port and flushes it.
148    async fn write(port: &mut tokio_serial::SerialStream, frame: &[u8]) -> Result<(), crate::error::Error> {
149        port
150            .write_all(frame)
151            .await
152            .map_err(|e| crate::error::Error::IO(e))?;
153        port
154            .flush()
155            .await
156            .map_err(|e| crate::error::Error::IO(e))?;
157        
158        Ok(())
159    }
160
161    /// Reads bytes until the slave stops responding or `buf` fills up.
162    async fn read(
163        port: &mut tokio_serial::SerialStream,
164        buf: &mut [u8],
165        timeout: core::time::Duration,
166        expected_len: usize,
167    ) -> Result<usize, crate::error::Error> {
168        let mut len: usize = 0;
169        let deadline = tokio::time::Instant::now() + timeout;
170        loop {
171            let now = tokio::time::Instant::now();
172            if now >= deadline {
173                break;
174            }
175            let remaining = deadline.saturating_duration_since(now);
176            let read_res =
177                tokio::time::timeout(remaining, port.read(&mut buf[len..])).await;
178            let n = match read_res {
179                Ok(Ok(n)) => n,
180                Ok(Err(ref e)) if e.kind() == std::io::ErrorKind::TimedOut => {
181                    if len == 0 {
182                        continue;
183                    }
184                    if len >= 5 && buf[1] & 0x80 != 0 {
185                        break;
186                    }
187                    if len < expected_len {
188                        continue;
189                    }
190                    break
191                }
192                Ok(Err(e)) => return Err(crate::error::Error::IO(e)),
193                Err(_) => break,
194            };
195            len += n;
196            if len >= buf.len() {
197                break;
198            }
199        }
200        Ok(len)
201    }
202
203    /// Computes the Modbus RTU T3.5 idle time for a link running 8N1 encoding.
204    fn idle_time_rs485(baud_rate: u32) -> core::time::Duration {
205        const BITS_PER_CHAR: f64 = 10.0;
206        let seconds = 3.5 * BITS_PER_CHAR / baud_rate as f64;
207        core::time::Duration::from_secs_f64(seconds)
208    }
209}
210
211
212impl Drop for QueuedMaster {
213    fn drop(&mut self) {
214        self.handle.abort();
215    }
216}