modbus_relay/
rtu_transport.rs

1use std::time::{Duration, Instant};
2
3#[cfg(any(target_os = "linux", target_os = "macos"))]
4use std::os::unix::io::AsRawFd;
5
6#[cfg(any(target_os = "linux", target_os = "macos"))]
7use libc::{TIOCM_RTS, TIOCMGET, TIOCMSET};
8
9#[cfg(any(target_os = "linux", target_os = "macos"))]
10use serialport::TTYPort;
11
12use serialport::SerialPort;
13use tokio::sync::Mutex;
14use tracing::{info, trace};
15
16use crate::{RtsError, RtsType};
17
18use crate::{FrameErrorKind, IoOperation, RelayError, RtuConfig, TransportError};
19
20pub struct RtuTransport {
21    port: Mutex<Box<dyn SerialPort>>,
22    config: RtuConfig,
23    trace_frames: bool,
24
25    #[cfg(any(target_os = "linux", target_os = "macos"))]
26    raw_fd: i32,
27}
28
29impl RtuTransport {
30    pub fn new(config: &RtuConfig, trace_frames: bool) -> Result<Self, TransportError> {
31        info!("Opening serial port {}", config.serial_port_info());
32
33        // Explicitly open as TTYPort on Unix
34        #[cfg(any(target_os = "linux", target_os = "macos"))]
35        let tty_port: TTYPort = serialport::new(&config.device, config.baud_rate)
36            .data_bits(config.data_bits.into())
37            .parity(config.parity.into())
38            .stop_bits(config.stop_bits.into())
39            .timeout(config.serial_timeout)
40            .flow_control(serialport::FlowControl::None)
41            .open_native()
42            .map_err(|e| TransportError::Io {
43                operation: IoOperation::Configure,
44                details: format!("serial port {}", config.device),
45                source: std::io::Error::other(e.description),
46            })?;
47
48        #[cfg(any(target_os = "linux", target_os = "macos"))]
49        let raw_fd = tty_port.as_raw_fd();
50
51        #[cfg(any(target_os = "linux", target_os = "macos"))]
52        let port: Box<dyn SerialPort> = Box::new(tty_port);
53
54        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
55        let port = serialport::new(&config.rtu_device, config.rtu_baud_rate)
56            .data_bits(config.data_bits.into())
57            .parity(config.parity.into())
58            .stop_bits(config.stop_bits.into())
59            .timeout(config.serial_timeout)
60            .flow_control(serialport::FlowControl::None)
61            .open()
62            .map_err(|e| TransportError::Io {
63                operation: IoOperation::Configure,
64                details: format!("serial port {}", config.rtu_device),
65                source: std::io::Error::new(std::io::ErrorKind::Other, e.description),
66            })?;
67
68        Ok(Self {
69            port: Mutex::new(port),
70            config: config.clone(),
71            trace_frames,
72            #[cfg(any(target_os = "linux", target_os = "macos"))]
73            raw_fd,
74        })
75    }
76
77    pub async fn close(&self) -> Result<(), TransportError> {
78        let port = self.port.lock().await;
79        port.clear(serialport::ClearBuffer::All)
80            .map_err(|e| TransportError::Io {
81                operation: IoOperation::Flush,
82                details: "Failed to clear buffers".to_string(),
83                source: std::io::Error::other(e.description),
84            })?;
85
86        #[cfg(unix)]
87        unsafe {
88            if libc::close(self.raw_fd) != 0 {
89                return Err(TransportError::Io {
90                    operation: IoOperation::Control,
91                    details: "Failed to close serial port".to_string(),
92                    source: std::io::Error::last_os_error(),
93                });
94            }
95        }
96
97        Ok(())
98    }
99
100    fn set_rts(&self, on: bool, trace_frames: bool) -> Result<(), TransportError> {
101        let rts_span = tracing::info_span!(
102            "rts_control",
103            signal = if on { "HIGH" } else { "LOW" },
104            delay_us = self.config.rts_delay_us,
105        );
106        let _enter = rts_span.enter();
107
108        unsafe {
109            let mut flags = 0i32;
110
111            // Get current flags
112            if libc::ioctl(self.raw_fd, TIOCMGET, &mut flags) < 0 {
113                let err = std::io::Error::last_os_error();
114                return Err(TransportError::Rts(RtsError::signal(format!(
115                    "Failed to get RTS flags: {} (errno: {})",
116                    err,
117                    err.raw_os_error().unwrap_or(-1)
118                ))));
119            }
120
121            // Modify RTS flag
122            if on {
123                flags |= TIOCM_RTS; // Set RTS HIGH
124            } else {
125                flags &= !TIOCM_RTS; // Set RTS LOW
126            }
127
128            // Set new flags
129            if libc::ioctl(self.raw_fd, TIOCMSET, &flags) < 0 {
130                let err = std::io::Error::last_os_error();
131                return Err(TransportError::Rts(RtsError::signal(format!(
132                    "Failed to set RTS flags: {} (errno: {})",
133                    err,
134                    err.raw_os_error().unwrap_or(-1)
135                ))));
136            }
137
138            if trace_frames {
139                trace!("RTS set to {}", if on { "HIGH" } else { "LOW" });
140            }
141        }
142
143        Ok(())
144    }
145
146    #[cfg(any(target_os = "linux", target_os = "macos"))]
147    fn tc_flush(&self) -> Result<(), TransportError> {
148        unsafe {
149            if libc::tcflush(self.raw_fd, libc::TCIOFLUSH) != 0 {
150                return Err(TransportError::Io {
151                    operation: IoOperation::Flush,
152                    details: format!(
153                        "Failed to flush serial port: {}",
154                        std::io::Error::last_os_error()
155                    ),
156                    source: std::io::Error::last_os_error(),
157                });
158            }
159        }
160        Ok(())
161    }
162
163    pub async fn transaction(
164        &self,
165        request: &[u8],
166        response: &mut [u8],
167    ) -> Result<usize, RelayError> {
168        if request.len() > self.config.max_frame_size as usize {
169            return Err(RelayError::frame(
170                FrameErrorKind::TooLong,
171                format!("Request frame too long: {} bytes", request.len()),
172                Some(request.to_vec()),
173            ));
174        }
175
176        let expected_size = response.len();
177
178        if self.trace_frames {
179            trace!("TX: {} bytes: {:02X?}", request.len(), request);
180            trace!("Expected response size: {} bytes", expected_size);
181        }
182
183        let transaction_start = Instant::now();
184
185        let result = tokio::time::timeout(self.config.transaction_timeout, async {
186            let mut port = self.port.lock().await;
187
188            if self.config.rts_type != RtsType::None {
189                if self.trace_frames {
190                    trace!("RTS -> TX mode");
191                }
192
193                self.set_rts(
194                    self.config.rts_type.to_signal_level(true),
195                    self.trace_frames,
196                )?;
197
198                if self.config.rts_delay_us > 0 {
199                    if self.trace_frames {
200                        trace!("RTS -> TX mode [waiting]");
201                    }
202                    tokio::time::sleep(Duration::from_micros(self.config.rts_delay_us)).await;
203                }
204            }
205
206            // Write request
207            if self.trace_frames {
208                trace!("Writing request");
209            }
210            port.write_all(request).map_err(|e| TransportError::Io {
211                operation: IoOperation::Write,
212                details: "Failed to write request".to_string(),
213                source: e,
214            })?;
215
216            port.flush().map_err(|e| TransportError::Io {
217                operation: IoOperation::Flush,
218                details: "Failed to flush write buffer".to_string(),
219                source: e,
220            })?;
221
222            if self.config.rts_type != RtsType::None {
223                if self.trace_frames {
224                    trace!("RTS -> RX mode");
225                }
226
227                self.set_rts(
228                    self.config.rts_type.to_signal_level(false),
229                    self.trace_frames,
230                )?;
231            }
232
233            if self.config.flush_after_write {
234                if self.trace_frames {
235                    trace!("RTS -> TX mode [flushing]");
236                }
237                self.tc_flush()?;
238            }
239
240            if self.config.rts_type != RtsType::None && self.config.rts_delay_us > 0 {
241                if self.trace_frames {
242                    trace!("RTS -> RX mode [waiting]");
243                }
244                tokio::time::sleep(Duration::from_micros(self.config.rts_delay_us)).await;
245            }
246
247            // Read response
248            if self.trace_frames {
249                trace!("Reading response (expecting {} bytes)", expected_size);
250            }
251
252            const MAX_TIMEOUTS: u8 = 3;
253            let mut total_bytes = 0;
254            let mut consecutive_timeouts = 0;
255            let inter_byte_timeout = Duration::from_millis(100);
256            let mut last_read_time = tokio::time::Instant::now();
257
258            while total_bytes < expected_size {
259                match port.read(&mut response[total_bytes..]) {
260                    Ok(0) => {
261                        if total_bytes > 0 {
262                            let elapsed = last_read_time.elapsed();
263                            if elapsed >= inter_byte_timeout {
264                                trace!("Inter-byte timeout reached with {} bytes", total_bytes);
265                                break;
266                            }
267                        }
268                        tokio::task::yield_now().await;
269                    }
270                    Ok(n) => {
271                        if self.trace_frames {
272                            trace!(
273                                "Read {} bytes: {:02X?}",
274                                n,
275                                &response[total_bytes..total_bytes + n]
276                            );
277                        }
278                        total_bytes += n;
279                        last_read_time = tokio::time::Instant::now();
280                        consecutive_timeouts = 0;
281
282                        if total_bytes >= expected_size {
283                            if self.trace_frames {
284                                trace!("Received complete response");
285                            }
286                            break;
287                        }
288                    }
289                    Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
290                        if total_bytes > 0 {
291                            let elapsed = last_read_time.elapsed();
292                            if elapsed >= inter_byte_timeout {
293                                trace!("Inter-byte timeout reached after timeout");
294                                break;
295                            }
296                        }
297                        consecutive_timeouts += 1;
298                        if consecutive_timeouts >= MAX_TIMEOUTS {
299                            if total_bytes == 0 {
300                                return Err(TransportError::NoResponse {
301                                    attempts: consecutive_timeouts,
302                                    elapsed: transaction_start.elapsed(),
303                                });
304                            }
305                            trace!("Max timeouts reached with {} bytes", total_bytes);
306                            break;
307                        }
308                        tokio::task::yield_now().await;
309                    }
310                    Err(e) => {
311                        return Err(TransportError::Io {
312                            operation: IoOperation::Read,
313                            details: "Failed to read response".to_string(),
314                            source: e,
315                        });
316                    }
317                }
318            }
319
320            if total_bytes == 0 {
321                return Err(TransportError::NoResponse {
322                    attempts: consecutive_timeouts,
323                    elapsed: transaction_start.elapsed(),
324                });
325            }
326
327            // Verify minimum response size
328            if total_bytes < 3 {
329                return Err(TransportError::Io {
330                    operation: IoOperation::Read,
331                    details: format!("Response too short: {} bytes", total_bytes),
332                    source: std::io::Error::new(
333                        std::io::ErrorKind::InvalidData,
334                        "Response too short",
335                    ),
336                });
337            }
338
339            if self.trace_frames {
340                trace!(
341                    "RX: {} bytes: {:02X?}",
342                    total_bytes,
343                    &response[..total_bytes],
344                );
345            }
346
347            Ok(total_bytes)
348        })
349        .await
350        .map_err(|elapsed| TransportError::Timeout {
351            elapsed: transaction_start.elapsed(),
352            limit: self.config.transaction_timeout,
353            source: elapsed,
354        })?;
355
356        Ok(result?)
357    }
358}