1use std::time::{Duration, Instant};
2
3#[cfg(any(target_os = "linux", target_os = "macos"))]
4use std::os::unix::io::AsRawFd;
5
6#[cfg(any(target_os = "linux", target_os = "macos"))]
7use libc::{TIOCM_RTS, TIOCMGET, TIOCMSET};
8
9#[cfg(any(target_os = "linux", target_os = "macos"))]
10use serialport::TTYPort;
11
12use serialport::SerialPort;
13use tokio::sync::Mutex;
14use tracing::{info, trace};
15
16use crate::{RtsError, RtsType};
17
18use crate::{FrameErrorKind, IoOperation, RelayError, RtuConfig, TransportError};
19
20pub struct RtuTransport {
21 port: Mutex<Box<dyn SerialPort>>,
22 config: RtuConfig,
23 trace_frames: bool,
24
25 #[cfg(any(target_os = "linux", target_os = "macos"))]
26 raw_fd: i32,
27}
28
29impl RtuTransport {
30 pub fn new(config: &RtuConfig, trace_frames: bool) -> Result<Self, TransportError> {
31 info!("Opening serial port {}", config.serial_port_info());
32
33 #[cfg(any(target_os = "linux", target_os = "macos"))]
35 let tty_port: TTYPort = serialport::new(&config.device, config.baud_rate)
36 .data_bits(config.data_bits.into())
37 .parity(config.parity.into())
38 .stop_bits(config.stop_bits.into())
39 .timeout(config.serial_timeout)
40 .flow_control(serialport::FlowControl::None)
41 .open_native()
42 .map_err(|e| TransportError::Io {
43 operation: IoOperation::Configure,
44 details: format!("serial port {}", config.device),
45 source: std::io::Error::other(e.description),
46 })?;
47
48 #[cfg(any(target_os = "linux", target_os = "macos"))]
49 let raw_fd = tty_port.as_raw_fd();
50
51 #[cfg(any(target_os = "linux", target_os = "macos"))]
52 let port: Box<dyn SerialPort> = Box::new(tty_port);
53
54 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
55 let port = serialport::new(&config.rtu_device, config.rtu_baud_rate)
56 .data_bits(config.data_bits.into())
57 .parity(config.parity.into())
58 .stop_bits(config.stop_bits.into())
59 .timeout(config.serial_timeout)
60 .flow_control(serialport::FlowControl::None)
61 .open()
62 .map_err(|e| TransportError::Io {
63 operation: IoOperation::Configure,
64 details: format!("serial port {}", config.rtu_device),
65 source: std::io::Error::new(std::io::ErrorKind::Other, e.description),
66 })?;
67
68 Ok(Self {
69 port: Mutex::new(port),
70 config: config.clone(),
71 trace_frames,
72 #[cfg(any(target_os = "linux", target_os = "macos"))]
73 raw_fd,
74 })
75 }
76
77 pub async fn close(&self) -> Result<(), TransportError> {
78 let port = self.port.lock().await;
79 port.clear(serialport::ClearBuffer::All)
80 .map_err(|e| TransportError::Io {
81 operation: IoOperation::Flush,
82 details: "Failed to clear buffers".to_string(),
83 source: std::io::Error::other(e.description),
84 })?;
85
86 #[cfg(unix)]
87 unsafe {
88 if libc::close(self.raw_fd) != 0 {
89 return Err(TransportError::Io {
90 operation: IoOperation::Control,
91 details: "Failed to close serial port".to_string(),
92 source: std::io::Error::last_os_error(),
93 });
94 }
95 }
96
97 Ok(())
98 }
99
100 fn set_rts(&self, on: bool, trace_frames: bool) -> Result<(), TransportError> {
101 let rts_span = tracing::info_span!(
102 "rts_control",
103 signal = if on { "HIGH" } else { "LOW" },
104 delay_us = self.config.rts_delay_us,
105 );
106 let _enter = rts_span.enter();
107
108 unsafe {
109 let mut flags = 0i32;
110
111 if libc::ioctl(self.raw_fd, TIOCMGET, &mut flags) < 0 {
113 let err = std::io::Error::last_os_error();
114 return Err(TransportError::Rts(RtsError::signal(format!(
115 "Failed to get RTS flags: {} (errno: {})",
116 err,
117 err.raw_os_error().unwrap_or(-1)
118 ))));
119 }
120
121 if on {
123 flags |= TIOCM_RTS; } else {
125 flags &= !TIOCM_RTS; }
127
128 if libc::ioctl(self.raw_fd, TIOCMSET, &flags) < 0 {
130 let err = std::io::Error::last_os_error();
131 return Err(TransportError::Rts(RtsError::signal(format!(
132 "Failed to set RTS flags: {} (errno: {})",
133 err,
134 err.raw_os_error().unwrap_or(-1)
135 ))));
136 }
137
138 if trace_frames {
139 trace!("RTS set to {}", if on { "HIGH" } else { "LOW" });
140 }
141 }
142
143 Ok(())
144 }
145
146 #[cfg(any(target_os = "linux", target_os = "macos"))]
147 fn tc_flush(&self) -> Result<(), TransportError> {
148 unsafe {
149 if libc::tcflush(self.raw_fd, libc::TCIOFLUSH) != 0 {
150 return Err(TransportError::Io {
151 operation: IoOperation::Flush,
152 details: format!(
153 "Failed to flush serial port: {}",
154 std::io::Error::last_os_error()
155 ),
156 source: std::io::Error::last_os_error(),
157 });
158 }
159 }
160 Ok(())
161 }
162
163 pub async fn transaction(
164 &self,
165 request: &[u8],
166 response: &mut [u8],
167 ) -> Result<usize, RelayError> {
168 if request.len() > self.config.max_frame_size as usize {
169 return Err(RelayError::frame(
170 FrameErrorKind::TooLong,
171 format!("Request frame too long: {} bytes", request.len()),
172 Some(request.to_vec()),
173 ));
174 }
175
176 let expected_size = response.len();
177
178 if self.trace_frames {
179 trace!("TX: {} bytes: {:02X?}", request.len(), request);
180 trace!("Expected response size: {} bytes", expected_size);
181 }
182
183 let transaction_start = Instant::now();
184
185 let result = tokio::time::timeout(self.config.transaction_timeout, async {
186 let mut port = self.port.lock().await;
187
188 if self.config.rts_type != RtsType::None {
189 if self.trace_frames {
190 trace!("RTS -> TX mode");
191 }
192
193 self.set_rts(
194 self.config.rts_type.to_signal_level(true),
195 self.trace_frames,
196 )?;
197
198 if self.config.rts_delay_us > 0 {
199 if self.trace_frames {
200 trace!("RTS -> TX mode [waiting]");
201 }
202 tokio::time::sleep(Duration::from_micros(self.config.rts_delay_us)).await;
203 }
204 }
205
206 if self.trace_frames {
208 trace!("Writing request");
209 }
210 port.write_all(request).map_err(|e| TransportError::Io {
211 operation: IoOperation::Write,
212 details: "Failed to write request".to_string(),
213 source: e,
214 })?;
215
216 port.flush().map_err(|e| TransportError::Io {
217 operation: IoOperation::Flush,
218 details: "Failed to flush write buffer".to_string(),
219 source: e,
220 })?;
221
222 if self.config.rts_type != RtsType::None {
223 if self.trace_frames {
224 trace!("RTS -> RX mode");
225 }
226
227 self.set_rts(
228 self.config.rts_type.to_signal_level(false),
229 self.trace_frames,
230 )?;
231 }
232
233 if self.config.flush_after_write {
234 if self.trace_frames {
235 trace!("RTS -> TX mode [flushing]");
236 }
237 self.tc_flush()?;
238 }
239
240 if self.config.rts_type != RtsType::None && self.config.rts_delay_us > 0 {
241 if self.trace_frames {
242 trace!("RTS -> RX mode [waiting]");
243 }
244 tokio::time::sleep(Duration::from_micros(self.config.rts_delay_us)).await;
245 }
246
247 if self.trace_frames {
249 trace!("Reading response (expecting {} bytes)", expected_size);
250 }
251
252 const MAX_TIMEOUTS: u8 = 3;
253 let mut total_bytes = 0;
254 let mut consecutive_timeouts = 0;
255 let inter_byte_timeout = Duration::from_millis(100);
256 let mut last_read_time = tokio::time::Instant::now();
257
258 while total_bytes < expected_size {
259 match port.read(&mut response[total_bytes..]) {
260 Ok(0) => {
261 if total_bytes > 0 {
262 let elapsed = last_read_time.elapsed();
263 if elapsed >= inter_byte_timeout {
264 trace!("Inter-byte timeout reached with {} bytes", total_bytes);
265 break;
266 }
267 }
268 tokio::task::yield_now().await;
269 }
270 Ok(n) => {
271 if self.trace_frames {
272 trace!(
273 "Read {} bytes: {:02X?}",
274 n,
275 &response[total_bytes..total_bytes + n]
276 );
277 }
278 total_bytes += n;
279 last_read_time = tokio::time::Instant::now();
280 consecutive_timeouts = 0;
281
282 if total_bytes >= expected_size {
283 if self.trace_frames {
284 trace!("Received complete response");
285 }
286 break;
287 }
288 }
289 Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
290 if total_bytes > 0 {
291 let elapsed = last_read_time.elapsed();
292 if elapsed >= inter_byte_timeout {
293 trace!("Inter-byte timeout reached after timeout");
294 break;
295 }
296 }
297 consecutive_timeouts += 1;
298 if consecutive_timeouts >= MAX_TIMEOUTS {
299 if total_bytes == 0 {
300 return Err(TransportError::NoResponse {
301 attempts: consecutive_timeouts,
302 elapsed: transaction_start.elapsed(),
303 });
304 }
305 trace!("Max timeouts reached with {} bytes", total_bytes);
306 break;
307 }
308 tokio::task::yield_now().await;
309 }
310 Err(e) => {
311 return Err(TransportError::Io {
312 operation: IoOperation::Read,
313 details: "Failed to read response".to_string(),
314 source: e,
315 });
316 }
317 }
318 }
319
320 if total_bytes == 0 {
321 return Err(TransportError::NoResponse {
322 attempts: consecutive_timeouts,
323 elapsed: transaction_start.elapsed(),
324 });
325 }
326
327 if total_bytes < 3 {
329 return Err(TransportError::Io {
330 operation: IoOperation::Read,
331 details: format!("Response too short: {} bytes", total_bytes),
332 source: std::io::Error::new(
333 std::io::ErrorKind::InvalidData,
334 "Response too short",
335 ),
336 });
337 }
338
339 if self.trace_frames {
340 trace!(
341 "RX: {} bytes: {:02X?}",
342 total_bytes,
343 &response[..total_bytes],
344 );
345 }
346
347 Ok(total_bytes)
348 })
349 .await
350 .map_err(|elapsed| TransportError::Timeout {
351 elapsed: transaction_start.elapsed(),
352 limit: self.config.transaction_timeout,
353 source: elapsed,
354 })?;
355
356 Ok(result?)
357 }
358}