modbus_rtu/master/
queued.rs1use std::sync::Arc;
4use tokio::{io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, task::JoinHandle, time::Instant};
5use tokio_serial::SerialPortBuilderExt;
6use crate::{Function, Request, Response};
7use serialport::SerialPort;
8
9
10pub struct QueuedMaster {
12 handle: JoinHandle<()>,
13 sender: mpsc::Sender<Job>,
14}
15
16struct Job {
18 request: OwnedRequest,
19 baud_rate: u32,
20 respond_to: oneshot::Sender<Result<Response, crate::error::Error>>,
21}
22
23struct OwnedRequest {
25 modbus_id: u8,
26 function: Function,
27 timeout: core::time::Duration,
28}
29
30impl OwnedRequest {
31 fn from_borrowed(req: &Request<'_>) -> Self {
33 Self {
34 modbus_id: req.modbus_id(),
35 function: req.function().clone(),
36 timeout: req.timeout(),
37 }
38 }
39
40 fn as_request(&self) -> Request<'_> {
42 Request::new(self.modbus_id, &self.function, self.timeout)
43 }
44}
45
46impl QueuedMaster {
47 pub async fn new_rs485(path: &str, baud_rate: u32, buffer: usize) -> tokio_serial::Result<Arc<Self>> {
54 let port = tokio_serial::new(path, baud_rate)
55 .data_bits(tokio_serial::DataBits::Eight)
56 .parity(tokio_serial::Parity::None)
57 .stop_bits(tokio_serial::StopBits::One)
58 .timeout(Self::idle_time_rs485(baud_rate))
59 .open_native_async()?;
60 let (sender, receiver) = mpsc::channel::<Job>(buffer);
61 let handle = tokio::task::spawn(Self::task(port, baud_rate, receiver));
62 Ok(Arc::new(Self {
63 handle,
64 sender,
65 }))
66 }
67
68 pub async fn send(&self, req: &Request<'_>, baud_rate: u32) -> Result<Response, crate::error::Error> {
70 let (sender, receiver) = oneshot::channel::<Result<Response, crate::error::Error>>();
71 let job = Job {
72 request: OwnedRequest::from_borrowed(req),
73 baud_rate,
74 respond_to: sender,
75 };
76 self.sender
77 .send(job)
78 .await
79 .map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?;
80 receiver
81 .await
82 .map_err(|_| crate::error::Error::IO(std::io::ErrorKind::BrokenPipe.into()))?
83 }
84
85 async fn task(mut port: tokio_serial::SerialStream, baud_rate: u32, mut receiver: mpsc::Receiver<Job>) {
87 let mut baud_rate = baud_rate;
88 let mut last_tx: Instant = Instant::now();
89 while let Some(Job { request, baud_rate: req_baud_rate, respond_to }) = receiver.recv().await {
90 let req = request.as_request();
91 if req_baud_rate != baud_rate {
92 if let Err(e) = port.set_baud_rate(req_baud_rate) {
93 let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
94 continue;
95 }
96 baud_rate = req_baud_rate;
97 }
98
99 let idle_until = last_tx + Self::idle_time_rs485(baud_rate);
100 let now = tokio::time::Instant::now();
101 if idle_until > now {
102 tokio::time::sleep_until(idle_until).await;
103 }
104
105 let frame = match req.to_bytes() {
106 Ok(frame) => frame,
107 Err(e) => {
108 let _ = respond_to.send(Err(crate::error::Error::Request(e)));
109 continue;
110 },
111 };
112
113 if let Err(e) = port.clear(serialport::ClearBuffer::Output) {
114 let _ = respond_to.send(Err(crate::error::Error::IO(e.into())));
115 continue;
116 }
117 if let Err(e) = Self::write(&mut port, &frame).await {
118 let _ = respond_to.send(Err(e));
119 continue;
120 }
121 last_tx = tokio::time::Instant::now();
122
123 if req.is_broadcasting() {
124 let _ = respond_to.send(Ok(Response::Success));
125 continue;
126 }
127
128 let post_tx_idle = Self::idle_time_rs485(baud_rate);
129 tokio::time::sleep(post_tx_idle).await;
130 let mut buf: [u8; 256] = [0; 256];
131 let len = match Self::read(&mut port, &mut buf, req.timeout(), req.function().expected_len()).await {
132 Ok(len) => len,
133 Err(e) => {
134 let _ = respond_to.send(Err(e));
135 continue;
136 },
137 };
138 if len == 0 {
139 let _ = respond_to.send(Err(crate::error::Error::IO(std::io::ErrorKind::TimedOut.into())));
140 continue;
141 }
142 let res = Response::from_bytes(&req, &buf[0..len]).map_err(|e| crate::error::Error::Response(e));
143 let _ = respond_to.send(res);
144 }
145 }
146
147 async fn write(port: &mut tokio_serial::SerialStream, frame: &[u8]) -> Result<(), crate::error::Error> {
149 port
150 .write_all(frame)
151 .await
152 .map_err(|e| crate::error::Error::IO(e))?;
153 port
154 .flush()
155 .await
156 .map_err(|e| crate::error::Error::IO(e))?;
157
158 Ok(())
159 }
160
161 async fn read(
163 port: &mut tokio_serial::SerialStream,
164 buf: &mut [u8],
165 timeout: core::time::Duration,
166 expected_len: usize,
167 ) -> Result<usize, crate::error::Error> {
168 let mut len: usize = 0;
169 let deadline = tokio::time::Instant::now() + timeout;
170 loop {
171 let now = tokio::time::Instant::now();
172 if now >= deadline {
173 break;
174 }
175 let remaining = deadline.saturating_duration_since(now);
176 let read_res =
177 tokio::time::timeout(remaining, port.read(&mut buf[len..])).await;
178 let n = match read_res {
179 Ok(Ok(n)) => n,
180 Ok(Err(ref e)) if e.kind() == std::io::ErrorKind::TimedOut => {
181 if len == 0 {
182 continue;
183 }
184 if len >= 5 && buf[1] & 0x80 != 0 {
185 break;
186 }
187 if len < expected_len {
188 continue;
189 }
190 break
191 }
192 Ok(Err(e)) => return Err(crate::error::Error::IO(e)),
193 Err(_) => break,
194 };
195 len += n;
196 if len >= buf.len() {
197 break;
198 }
199 }
200 Ok(len)
201 }
202
203 fn idle_time_rs485(baud_rate: u32) -> core::time::Duration {
205 const BITS_PER_CHAR: f64 = 10.0;
206 let seconds = 3.5 * BITS_PER_CHAR / baud_rate as f64;
207 core::time::Duration::from_secs_f64(seconds)
208 }
209}
210
211
212impl Drop for QueuedMaster {
213 fn drop(&mut self) {
214 self.handle.abort();
215 }
216}