Skip to main content

mbus_network/management/
std_transport.rs

1use std::io::{self, Read, Write};
2use std::net::{TcpStream, ToSocketAddrs};
3use std::time::Duration;
4
5use heapless::Vec;
6use mbus_core::data_unit::common::MAX_ADU_FRAME_LEN;
7use mbus_core::transport::{ModbusConfig, Transport, TransportError, TransportType};
8
9#[cfg(feature = "logging")]
10macro_rules! transport_log_error {
11    ($($arg:tt)*) => {
12        log::error!($($arg)*)
13    };
14}
15
16#[cfg(not(feature = "logging"))]
17macro_rules! transport_log_error {
18    ($($arg:tt)*) => {{
19        let _ = core::format_args!($($arg)*);
20    }};
21}
22
23#[cfg(feature = "logging")]
24macro_rules! transport_log_warn {
25    ($($arg:tt)*) => {
26        log::warn!($($arg)*)
27    };
28}
29
30#[cfg(not(feature = "logging"))]
31macro_rules! transport_log_warn {
32    ($($arg:tt)*) => {{
33        let _ = core::format_args!($($arg)*);
34    }};
35}
36
37#[cfg(feature = "logging")]
38macro_rules! transport_log_debug {
39    ($($arg:tt)*) => {
40        log::debug!($($arg)*)
41    };
42}
43
44#[cfg(not(feature = "logging"))]
45macro_rules! transport_log_debug {
46    ($($arg:tt)*) => {{
47        let _ = core::format_args!($($arg)*);
48    }};
49}
50
51/// A concrete implementation of `ModbusTcpTransport` using `std::net::TcpStream`.
52///
53/// This struct manages a standard TCP connection for Modbus TCP communication.
54#[derive(Debug, Default)]
55pub struct StdTcpTransport {
56    stream: Option<TcpStream>,
57}
58
59impl StdTcpTransport {
60    /// Creates a new `StdTcpTransport` instance.
61    ///
62    /// Initially, there is no active connection.
63    ///
64    /// # Arguments
65    /// * `config` - The `ModbusConfig` to use for this transport.
66    ///
67    /// # Returns
68    /// A new `StdTcpTransport` instance with the provided configuration and no active connection.
69    pub fn new() -> Self {
70        Self { stream: None }
71    }
72
73    /// Helper function to convert `std::io::Error` to `TransportError`.
74    ///
75    /// This maps common I/O error kinds to specific Modbus transport errors.
76    fn map_io_error(err: io::Error) -> TransportError {
77        match err.kind() {
78            io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound => {
79                TransportError::ConnectionFailed
80            }
81            io::ErrorKind::BrokenPipe
82            | io::ErrorKind::ConnectionReset
83            | io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
84            io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => TransportError::Timeout,
85            _ => TransportError::IoError,
86        }
87    }
88}
89
90impl Transport for StdTcpTransport {
91    type Error = TransportError;
92
93    /// Establishes a TCP connection to the specified remote address.
94    ///
95    /// # Arguments
96    /// * `addr` - The address of the Modbus TCP server (e.g., "192.168.1.1:502").
97    /// * `config` - The `ModbusTcpConfig` containing the host and port of the Modbus TCP server.
98    ///
99    /// # Returns
100    /// `Ok(())` if the connection is successfully established, or an error otherwise.
101    fn connect(&mut self, config: &ModbusConfig) -> Result<(), Self::Error> {
102        let config = match config {
103            ModbusConfig::Tcp(c) => c,
104            _ => return Err(TransportError::Unexpected),
105        };
106
107        let connection_timeout = Duration::from_millis(config.connection_timeout_ms as u64);
108        let response_timeout = Duration::from_millis(config.response_timeout_ms as u64);
109
110        // Resolve the host and port to socket addresses.
111        // For a single connection, we will only attempt to connect to the first resolved address.
112        let mut addrs_iter = (config.host.as_str(), config.port)
113            .to_socket_addrs()
114            .map_err(|e| {
115                transport_log_error!("DNS resolution failed: {:?}", e);
116                TransportError::ConnectionFailed
117            })?;
118
119        // Take only the first address, as per the requirement for a single connection.
120        let addr = addrs_iter.next().ok_or_else(|| {
121            transport_log_error!("No valid address found for host:port combination.");
122            TransportError::ConnectionFailed
123        })?;
124
125        transport_log_debug!("Trying address: {:?}", addr);
126
127        match TcpStream::connect_timeout(&addr, connection_timeout) {
128            Ok(stream) => {
129                // These operations are best-effort and their failure is not critical for the connection itself.
130                // Errors are logged but not propagated to avoid disrupting the connection flow.
131                stream
132                    .set_read_timeout(Some(response_timeout))
133                    .unwrap_or_else(|e| transport_log_warn!("Failed to set read timeout: {:?}", e));
134                stream
135                    .set_write_timeout(Some(response_timeout))
136                    .unwrap_or_else(|e| {
137                        transport_log_warn!("Failed to set write timeout: {:?}", e)
138                    });
139                stream
140                    .set_nodelay(true)
141                    .unwrap_or_else(|e| transport_log_warn!("Failed to set no-delay: {:?}", e));
142
143                self.stream = Some(stream); // Store the connected stream
144                Ok(()) // Connection successful
145            }
146            Err(e) => {
147                transport_log_error!("Connect failed: {:?}", e);
148                Err(TransportError::ConnectionFailed) // Connection failed for this single address
149            }
150        }
151    }
152
153    /// Closes the active TCP connection.
154    ///
155    /// If no connection is active, this operation does nothing and returns `Ok(())`.
156    fn disconnect(&mut self) -> Result<(), Self::Error> {
157        // Taking the stream out of the Option will drop it,
158        // which in turn closes the underlying TCP connection.
159        if let Some(stream) = self.stream.take() {
160            drop(stream);
161        }
162        Ok(())
163    }
164
165    /// Sends a Modbus Application Data Unit (ADU) over the TCP connection.
166    ///
167    /// # Arguments
168    /// * `adu` - The byte slice representing the ADU to send.
169    ///
170    /// # Returns
171    /// `Ok(())` if the ADU is successfully sent, or an error otherwise.
172    fn send(&mut self, adu: &[u8]) -> Result<(), Self::Error> {
173        let stream = self
174            .stream
175            .as_mut()
176            .ok_or(TransportError::ConnectionClosed)?;
177
178        let result = stream.write_all(adu).and_then(|()| stream.flush());
179
180        if let Err(err) = result {
181            let transport_error = Self::map_io_error(err);
182            if transport_error == TransportError::ConnectionClosed {
183                self.stream = None;
184            }
185            return Err(transport_error);
186        }
187
188        Ok(())
189    }
190
191    /// Receives available Modbus bytes from the TCP connection.
192    ///
193    /// This implementation performs a single, non-blocking read operation. It retrieves
194    /// whatever bytes are currently available in the socket buffer up to the maximum ADU size.
195    /// The higher-level Modbus Client Services (`ClientServices::ingest_frame`) is responsible
196    /// for buffering and isolating the complete frames.
197    ///
198    /// # Returns
199    /// `Ok(Vec<u8, MAX_ADU_FRAME_LEN>)` containing the received bytes, or an error otherwise.
200    fn recv(&mut self) -> Result<Vec<u8, MAX_ADU_FRAME_LEN>, Self::Error> {
201        let stream = self
202            .stream
203            .as_mut()
204            .ok_or(TransportError::ConnectionClosed)?;
205
206        // Temporarily set the stream to non-blocking to immediately return available bytes
207        // without blocking the caller's polling loop.
208        let _ = stream.set_nonblocking(true);
209
210        let mut temp_buf = [0u8; MAX_ADU_FRAME_LEN];
211        let read_result = stream.read(&mut temp_buf);
212
213        // Restore the stream to blocking mode so that `send` (which uses `write_all`)
214        // continues to respect the configured timeouts without failing prematurely on a full buffer.
215        let _ = stream.set_nonblocking(false);
216
217        match read_result {
218            Ok(0) => {
219                // A read of 0 bytes indicates the peer gracefully closed the TCP connection.
220                self.stream = None;
221                Err(TransportError::ConnectionClosed)
222            }
223            Ok(n) => {
224                let mut buffer = Vec::new();
225                // Copy the read bytes into our heapless vector.
226                if buffer.extend_from_slice(&temp_buf[..n]).is_err() {
227                    return Err(TransportError::BufferTooSmall);
228                }
229                Ok(buffer)
230            }
231            Err(e) => {
232                let err = Self::map_io_error(e);
233                if err == TransportError::ConnectionClosed {
234                    self.stream = None;
235                }
236                // WouldBlock gets mapped to TransportError::Timeout, signaling no data is currently ready.
237                Err(err)
238            }
239        }
240    }
241
242    /// Checks if the transport is currently connected to a remote host.
243    ///
244    /// This is a best-effort check and indicates if a `TcpStream` is currently held.
245    fn is_connected(&self) -> bool {
246        self.stream.is_some()
247    }
248
249    /// Returns the type of transport.
250    fn transport_type(&self) -> TransportType {
251        TransportType::StdTcp
252    }
253}
254
255#[cfg(test)]
256impl StdTcpTransport {
257    pub fn stream_mut(&mut self) -> Option<&mut TcpStream> {
258        self.stream.as_mut()
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::super::std_transport::StdTcpTransport;
265    use mbus_core::transport::{ModbusConfig, ModbusTcpConfig, Transport, TransportError};
266    use std::io::{self, Read, Write};
267    use std::net::TcpListener;
268    use std::sync::mpsc;
269    use std::thread;
270    use std::time::Duration;
271
272    /// Helper function to create a TcpListener on an available port.
273    /// This listener is then passed to the server thread.
274    fn create_test_listener() -> TcpListener {
275        TcpListener::bind("127.0.0.1:0").expect("Failed to bind to an available port")
276    }
277
278    /// Helper function to extract host and port from a SocketAddr.
279    fn get_host_port(addr: std::net::SocketAddr) -> u16 {
280        addr.port()
281    }
282
283    /// Test case: `StdTcpTransport::new` creates an instance with no active connection.
284    #[test]
285    fn test_new_std_tcp_transport() {
286        let transport = StdTcpTransport::new();
287        assert!(!transport.is_connected());
288    }
289
290    /// Test case: `connect` successfully establishes a TCP connection.
291    ///
292    /// A mock server is set up to accept a single connection.
293    #[test]
294    fn test_connect_success() {
295        let listener = create_test_listener();
296        let addr = listener.local_addr().unwrap();
297        let (tx, rx) = mpsc::channel();
298
299        let server_handle = thread::spawn(move || {
300            tx.send(()).expect("Failed to send server ready signal"); // Signal that the listener is ready
301            // Accept one connection and then close
302            let _ = listener.accept().unwrap();
303        });
304
305        rx.recv().expect("Failed to receive server ready signal"); // Wait for the server to be ready
306
307        let mut transport = StdTcpTransport::new();
308        let port = get_host_port(addr);
309        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
310        let result = transport.connect(&config);
311        assert!(result.is_ok());
312        assert!(transport.is_connected());
313
314        server_handle.join().unwrap();
315    }
316
317    /// Test case: `connect` fails with an invalid address string.
318    #[test]
319    fn test_connect_failure_invalid_addr() {
320        let mut transport = StdTcpTransport::new();
321        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("invalid-address", 502).unwrap()); // Invalid host, but short enough
322        let result = transport.connect(&config);
323        assert!(result.is_err());
324        assert_eq!(result.unwrap_err(), TransportError::ConnectionFailed);
325        assert!(!transport.is_connected());
326    }
327
328    /// Test case: `connect` fails when the server actively refuses the connection.
329    ///
330    /// This is simulated by trying to connect to a port where no server is listening.
331    #[test]
332    fn test_connect_failure_connection_refused() {
333        // We don't start a server, so the port will be refused
334        let listener = create_test_listener(); // Just to get an unused port
335        let port = listener.local_addr().unwrap().port();
336        drop(listener); // Explicitly drop the listener to ensure the port is free
337        let mut transport = StdTcpTransport::new();
338        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
339        let result = transport.connect(&config);
340        assert!(result.is_err());
341        assert_eq!(result.unwrap_err(), TransportError::ConnectionFailed);
342        assert!(!transport.is_connected());
343    }
344
345    /// Test case: `disconnect` closes an active connection.
346    #[test]
347    fn test_disconnect() {
348        let listener = create_test_listener();
349        let addr = listener.local_addr().unwrap();
350        let (tx, rx) = mpsc::channel();
351
352        let server_handle = thread::spawn(move || {
353            tx.send(()).expect("Failed to send server ready signal");
354            let _ = listener.accept().unwrap(); // Just accept and hold
355        });
356
357        rx.recv().expect("Failed to receive server ready signal");
358
359        let mut transport = StdTcpTransport::new();
360        let port = get_host_port(addr);
361        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
362        transport.connect(&config).unwrap();
363        assert!(transport.is_connected());
364
365        let result = transport.disconnect();
366        assert!(result.is_ok());
367        assert!(!transport.is_connected());
368
369        server_handle.join().unwrap();
370    }
371
372    /// Test case: `send` successfully transmits data over an active connection.
373    ///
374    /// A mock server receives the data and verifies it.
375    #[test]
376    fn test_send_success() {
377        let listener = create_test_listener();
378        let addr = listener.local_addr().unwrap();
379        let (tx, rx) = mpsc::channel();
380        let test_data = [0x01, 0x02, 0x03, 0x04];
381
382        let server_handle = thread::spawn(move || {
383            tx.send(()).expect("Failed to send server ready signal");
384            let (mut stream, _) = listener.accept().unwrap();
385            let mut buf = [0; 4];
386            stream.read_exact(&mut buf).unwrap();
387            assert_eq!(buf, test_data);
388        });
389
390        rx.recv().expect("Failed to receive server ready signal");
391
392        let mut transport = StdTcpTransport::new();
393        let port = get_host_port(addr);
394        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
395        transport.connect(&config).unwrap();
396
397        let result = transport.send(&test_data);
398        assert!(result.is_ok());
399
400        server_handle.join().unwrap();
401    }
402
403    /// Test case: `send` fails when the transport is not connected.
404    #[test]
405    fn test_send_failure_not_connected() {
406        let mut transport = StdTcpTransport::new();
407        let test_data = [0x01, 0x02];
408        let result = transport.send(&test_data);
409        assert!(result.is_err());
410        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
411    }
412
413    /// Test case: `recv` successfully receives a complete Modbus ADU.
414    ///
415    /// A mock server sends a predefined valid ADU.
416    #[test]
417    fn test_recv_success_full_adu() {
418        let listener = create_test_listener();
419        let addr = listener.local_addr().unwrap();
420        let (tx, rx) = mpsc::channel();
421        // Example ADU: TID=0x0001, PID=0x0000, Length=0x0003 (Unit ID + FC + 1 data byte), UnitID=0x01, FC=0x03, Data=0x00
422        let adu_to_send = [0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x01, 0x03, 0x00];
423
424        let server_handle = thread::spawn(move || {
425            tx.send(()).expect("Failed to send server ready signal");
426            let (mut stream, _) = listener.accept().unwrap();
427            stream.write_all(&adu_to_send).unwrap();
428        });
429
430        rx.recv().expect("Failed to receive server ready signal");
431
432        let mut transport = StdTcpTransport::new();
433        let port = get_host_port(addr);
434        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
435
436        transport.connect(&config).unwrap();
437
438        // The non-blocking receiver might fetch fragments if testing fast enough, so
439        // we buffer and await until the full transmission is verified in the test.
440        let mut combined_adu = std::vec::Vec::new();
441        for _ in 0..50 {
442            match transport.recv() {
443                Ok(bytes) => {
444                    combined_adu.extend_from_slice(&bytes);
445                    if combined_adu.len() == adu_to_send.len() {
446                        break;
447                    }
448                }
449                Err(TransportError::Timeout) => {
450                    std::thread::sleep(Duration::from_millis(10));
451                }
452                Err(e) => panic!("Unexpected error: {:?}", e),
453            }
454        }
455        assert_eq!(combined_adu.as_slice(), adu_to_send);
456
457        server_handle.join().unwrap();
458    }
459
460    /// Test case: `recv` fails when the transport is not connected.
461    #[test]
462    fn test_recv_failure_not_connected() {
463        let mut transport = StdTcpTransport::new();
464        let result = transport.recv();
465        assert!(result.is_err());
466        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
467    }
468
469    /// Test case: `recv` fails when the peer closes the connection prematurely during header read.
470    #[test]
471    fn test_recv_failure_connection_closed_prematurely_header() {
472        let listener = create_test_listener();
473        let addr = listener.local_addr().unwrap();
474        let (tx, rx) = mpsc::channel();
475        // Send only part of the MBAP header (e.g., 3 bytes instead of 7)
476        let partial_adu = [0x00, 0x01, 0x00];
477
478        let server_handle = thread::spawn(move || {
479            tx.send(()).expect("Failed to send server ready signal");
480            let (mut stream, _) = listener.accept().unwrap();
481            stream.write_all(&partial_adu).unwrap();
482            // Server closes connection after sending partial data
483        });
484
485        rx.recv().expect("Failed to receive server ready signal");
486
487        let mut transport = StdTcpTransport::new();
488        let port = get_host_port(addr);
489        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
490        transport.connect(&config).unwrap();
491
492        let mut result = transport.recv();
493        for _ in 0..50 {
494            if let Err(TransportError::Timeout) = result {
495                std::thread::sleep(Duration::from_millis(10));
496                result = transport.recv();
497            } else if let Ok(_) = result {
498                result = transport.recv();
499            } else {
500                break;
501            }
502        }
503        assert!(result.is_err());
504        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
505
506        server_handle.join().unwrap();
507    }
508
509    /// Test case: `recv` fails when the peer closes the connection prematurely after header but before full PDU.
510    #[test]
511    fn test_recv_failure_connection_closed_prematurely_pdu() {
512        let listener = create_test_listener();
513        let addr = listener.local_addr().unwrap();
514        let (tx, rx) = mpsc::channel();
515        // Valid MBAP header indicating a PDU length, but then send less than expected
516        // TID=0x0001, PID=0x0000, Length=0x0005 (Unit ID + FC + 3 data bytes), UnitID=0x01, FC=0x03
517        let partial_adu = [0x00, 0x01, 0x00, 0x00, 0x00, 0x05, 0x01, 0x03]; // 8 bytes sent, but 11 expected
518
519        let server_handle = thread::spawn(move || {
520            tx.send(()).expect("Failed to send server ready signal");
521            let (mut stream, _) = listener.accept().unwrap();
522            stream.write_all(&partial_adu).unwrap();
523            // Server closes connection after sending partial PDU data
524        });
525
526        rx.recv().expect("Failed to receive server ready signal");
527
528        let mut transport = StdTcpTransport::new();
529        let port = get_host_port(addr);
530        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
531        transport.connect(&config).unwrap();
532
533        let mut result = transport.recv();
534        for _ in 0..50 {
535            if let Err(TransportError::Timeout) = result {
536                std::thread::sleep(Duration::from_millis(10));
537                result = transport.recv();
538            } else if let Ok(_) = result {
539                result = transport.recv();
540            } else {
541                break;
542            }
543        }
544        assert!(result.is_err());
545        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
546
547        server_handle.join().unwrap();
548    }
549
550    /// Test case: `recv` times out if no data is received within the specified duration.
551    #[test]
552    fn test_recv_timeout() {
553        let listener = create_test_listener();
554        let addr = listener.local_addr().unwrap();
555        let (tx, rx) = mpsc::channel();
556
557        let server_handle = thread::spawn(move || {
558            tx.send(()).expect("Failed to send server ready signal");
559            let (_stream, _) = listener.accept().unwrap();
560            // Server accepts connection but sends no data, causing client to timeout
561            thread::sleep(Duration::from_secs(5)); // Ensure client times out first
562        });
563
564        rx.recv().expect("Failed to receive server ready signal");
565
566        let mut transport = StdTcpTransport::new(); // Very short timeout for test
567        let port = get_host_port(addr);
568        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
569        tcp_config.response_timeout_ms = 100; // Set short response timeout for test
570        let config = ModbusConfig::Tcp(tcp_config);
571        transport.connect(&config).unwrap();
572
573        let result = transport.recv();
574        assert!(result.is_err());
575        assert_eq!(result.unwrap_err(), TransportError::Timeout);
576
577        server_handle.join().unwrap();
578    }
579
580    /// Test case: `is_connected` returns true when connected and false when disconnected.
581    #[test]
582    fn test_is_connected() {
583        let listener = create_test_listener();
584        let addr = listener.local_addr().unwrap();
585        let (tx, rx) = mpsc::channel();
586
587        let server_handle = thread::spawn(move || {
588            tx.send(()).expect("Failed to send server ready signal");
589            let (_stream, _) = listener.accept().unwrap();
590            thread::sleep(Duration::from_millis(500)); // Keep connection open briefly
591        });
592
593        rx.recv().expect("Failed to receive server ready signal");
594
595        let mut transport = StdTcpTransport::new();
596        let port = get_host_port(addr);
597        assert!(!transport.is_connected());
598
599        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
600        transport.connect(&config).unwrap();
601
602        assert!(transport.is_connected());
603
604        transport.disconnect().unwrap();
605        assert!(!transport.is_connected());
606
607        server_handle.join().unwrap();
608    }
609
610    /// Test case: `map_io_error` correctly maps various `io::Error` kinds to `TransportError`.
611    #[test]
612    fn test_map_io_error() {
613        // ConnectionRefused
614        let err = io::Error::new(io::ErrorKind::ConnectionRefused, "test");
615        assert_eq!(
616            StdTcpTransport::map_io_error(err),
617            TransportError::ConnectionFailed
618        );
619
620        // NotFound (often used for address resolution issues)
621        let err = io::Error::new(io::ErrorKind::NotFound, "test");
622        assert_eq!(
623            StdTcpTransport::map_io_error(err),
624            TransportError::ConnectionFailed
625        );
626
627        // BrokenPipe
628        let err = io::Error::new(io::ErrorKind::BrokenPipe, "test");
629        assert_eq!(
630            StdTcpTransport::map_io_error(err),
631            TransportError::ConnectionClosed
632        );
633
634        // ConnectionReset
635        let err = io::Error::new(io::ErrorKind::ConnectionReset, "test");
636        assert_eq!(
637            StdTcpTransport::map_io_error(err),
638            TransportError::ConnectionClosed
639        );
640
641        // UnexpectedEof
642        let err = io::Error::new(io::ErrorKind::UnexpectedEof, "test");
643        assert_eq!(
644            StdTcpTransport::map_io_error(err),
645            TransportError::ConnectionClosed
646        );
647
648        // WouldBlock
649        let err = io::Error::new(io::ErrorKind::WouldBlock, "test");
650        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::Timeout);
651
652        // TimedOut
653        let err = io::Error::new(io::ErrorKind::TimedOut, "test");
654        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::Timeout);
655
656        // Other I/O errors
657        let err = io::Error::new(io::ErrorKind::PermissionDenied, "test");
658        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::IoError);
659    }
660
661    /// Test case: `connect` with a custom timeout.
662    #[test]
663    fn test_connect_with_custom_timeout() {
664        let listener = create_test_listener();
665        let addr = listener.local_addr().unwrap();
666        let (tx, rx) = mpsc::channel();
667
668        let server_handle = thread::spawn(move || {
669            tx.send(()).expect("Failed to send server ready signal");
670            let _ = listener.accept().unwrap();
671        });
672
673        rx.recv().expect("Failed to receive server ready signal");
674
675        let mut transport = StdTcpTransport::new(); // Custom timeout
676        let port = get_host_port(addr);
677        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
678        tcp_config.connection_timeout_ms = 500; // Set custom connection timeout for test
679        let config = ModbusConfig::Tcp(tcp_config);
680        let result = transport.connect(&config);
681        assert!(result.is_ok());
682        assert!(transport.is_connected());
683
684        server_handle.join().unwrap();
685    }
686
687    /// Test case: `connect` with no timeout specified (uses default).
688    #[test]
689    fn test_connect_with_no_timeout() {
690        let listener = create_test_listener();
691        let addr = listener.local_addr().unwrap();
692        let (tx, rx) = mpsc::channel();
693
694        let server_handle = thread::spawn(move || {
695            tx.send(()).expect("Failed to send server ready signal");
696            let _ = listener.accept().unwrap();
697        });
698
699        rx.recv().expect("Failed to receive server ready signal");
700
701        let mut transport = StdTcpTransport::new(); // No timeout
702        let port = get_host_port(addr);
703        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
704        tcp_config.connection_timeout_ms = 500; // No timeout
705        let config = ModbusConfig::Tcp(tcp_config);
706        let result = transport.connect(&config);
707        assert!(result.is_ok());
708        assert!(transport.is_connected());
709
710        server_handle.join().unwrap();
711    }
712
713    /// Test case: `send` fails if the connection is reset by the peer.
714    #[test]
715    fn test_send_failure_connection_reset() {
716        let listener = create_test_listener();
717        let addr = listener.local_addr().unwrap();
718        let (tx, rx) = mpsc::channel();
719        let test_data = [0x01, 0x02, 0x03, 0x04];
720
721        let server_handle = thread::spawn(move || {
722            tx.send(()).expect("Failed to send server ready signal");
723            let (stream, _) = listener.accept().unwrap();
724            drop(stream); // Immediately close the stream after accepting
725        });
726
727        rx.recv().expect("Failed to receive server ready signal");
728
729        let mut transport = StdTcpTransport::new();
730        let port = get_host_port(addr);
731        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
732
733        transport.connect(&config).unwrap();
734
735        assert!(transport.is_connected());
736
737        // Attempt a receive operation to force the client's TcpStream to detect the peer's closure.
738        // This should result in TransportError::ConnectionClosed and update the transport's state.
739        let mut recv_result = transport.recv();
740        for _ in 0..50 {
741            if let Err(TransportError::Timeout) = recv_result {
742                std::thread::sleep(Duration::from_millis(10));
743                recv_result = transport.recv();
744            } else {
745                break;
746            }
747        }
748        assert!(recv_result.is_err());
749        assert_eq!(recv_result.unwrap_err(), TransportError::ConnectionClosed);
750        // Now, the transport should report as disconnected.
751        assert!(!transport.is_connected());
752
753        // A subsequent send operation should now reliably fail with ConnectionClosed.
754        let result = transport.send(&test_data);
755        assert!(result.is_err());
756        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
757
758        server_handle.join().unwrap();
759    }
760
761    /// Test case: `connect` successfully establishes a TCP connection to a single, valid address.
762    #[test]
763    fn test_connect_success_single_addr() {
764        let listener = create_test_listener();
765        let addr = listener.local_addr().unwrap();
766        let (tx, rx) = mpsc::channel();
767
768        // Server for the successful connection
769        let server_handle = thread::spawn(move || {
770            tx.send(()).expect("Failed to send server ready signal");
771            let _ = listener.accept().unwrap(); // Just accept and hold
772        });
773
774        rx.recv().expect("Failed to receive server ready signal");
775
776        let mut transport = StdTcpTransport::new();
777        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", addr.port()).unwrap());
778
779        let result = transport.connect(&config);
780        assert!(
781            result.is_ok(),
782            "Connection should succeed with a single address"
783        );
784        assert!(transport.is_connected());
785
786        server_handle.join().unwrap();
787    }
788}