Skip to main content

mbus_network/management/
std_transport.rs

1use std::io::{self, Read, Write};
2use std::net::{TcpStream, ToSocketAddrs};
3use std::time::Duration;
4
5use heapless::Vec;
6use mbus_core::data_unit::common::MAX_ADU_FRAME_LEN;
7use mbus_core::transport::{ModbusConfig, Transport, TransportError, TransportType};
8
9#[cfg(feature = "logging")]
10macro_rules! transport_log_error {
11    ($($arg:tt)*) => {
12        log::error!($($arg)*)
13    };
14}
15
16#[cfg(not(feature = "logging"))]
17macro_rules! transport_log_error {
18    ($($arg:tt)*) => {{
19        let _ = core::format_args!($($arg)*);
20    }};
21}
22
23#[cfg(feature = "logging")]
24macro_rules! transport_log_warn {
25    ($($arg:tt)*) => {
26        log::warn!($($arg)*)
27    };
28}
29
30#[cfg(not(feature = "logging"))]
31macro_rules! transport_log_warn {
32    ($($arg:tt)*) => {{
33        let _ = core::format_args!($($arg)*);
34    }};
35}
36
37#[cfg(feature = "logging")]
38macro_rules! transport_log_debug {
39    ($($arg:tt)*) => {
40        log::debug!($($arg)*)
41    };
42}
43
44#[cfg(not(feature = "logging"))]
45macro_rules! transport_log_debug {
46    ($($arg:tt)*) => {{
47        let _ = core::format_args!($($arg)*);
48    }};
49}
50
51/// A concrete implementation of `ModbusTcpTransport` using `std::net::TcpStream`.
52///
53/// This struct manages a standard TCP connection for Modbus TCP communication.
54#[derive(Debug, Default)]
55pub struct StdTcpTransport {
56    stream: Option<TcpStream>,
57}
58
59impl StdTcpTransport {
60    /// Creates a new `StdTcpTransport` instance.
61    ///
62    /// Initially, there is no active connection.
63    ///
64    /// # Arguments
65    /// * `config` - The `ModbusConfig` to use for this transport.
66    ///
67    /// # Returns
68    /// A new `StdTcpTransport` instance with the provided configuration and no active connection.
69    pub fn new() -> Self {
70        Self { stream: None }
71    }
72
73    /// Helper function to convert `std::io::Error` to `TransportError`.
74    ///
75    /// This maps common I/O error kinds to specific Modbus transport errors.
76    fn map_io_error(err: io::Error) -> TransportError {
77        match err.kind() {
78            io::ErrorKind::ConnectionRefused | io::ErrorKind::NotFound => {
79                TransportError::ConnectionFailed
80            }
81            io::ErrorKind::BrokenPipe
82            | io::ErrorKind::ConnectionReset
83            | io::ErrorKind::UnexpectedEof => TransportError::ConnectionClosed,
84            io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut => TransportError::Timeout,
85            _ => TransportError::IoError,
86        }
87    }
88}
89
90impl Transport for StdTcpTransport {
91    type Error = TransportError;
92    const TRANSPORT_TYPE: TransportType = TransportType::StdTcp;
93
94    /// Establishes a TCP connection to the specified remote address.
95    ///
96    /// # Arguments
97    /// * `addr` - The address of the Modbus TCP server (e.g., "192.168.1.1:502").
98    /// * `config` - The `ModbusTcpConfig` containing the host and port of the Modbus TCP server.
99    ///
100    /// # Returns
101    /// `Ok(())` if the connection is successfully established, or an error otherwise.
102    fn connect(&mut self, config: &ModbusConfig) -> Result<(), Self::Error> {
103        let config = match config {
104            ModbusConfig::Tcp(c) => c,
105            _ => return Err(TransportError::Unexpected),
106        };
107
108        let connection_timeout = Duration::from_millis(config.connection_timeout_ms as u64);
109        let response_timeout = Duration::from_millis(config.response_timeout_ms as u64);
110
111        // Resolve the host and port to socket addresses.
112        // For a single connection, we will only attempt to connect to the first resolved address.
113        let mut addrs_iter = (config.host.as_str(), config.port)
114            .to_socket_addrs()
115            .map_err(|e| {
116                transport_log_error!("DNS resolution failed: {:?}", e);
117                TransportError::ConnectionFailed
118            })?;
119
120        // Take only the first address, as per the requirement for a single connection.
121        let addr = addrs_iter.next().ok_or_else(|| {
122            transport_log_error!("No valid address found for host:port combination.");
123            TransportError::ConnectionFailed
124        })?;
125
126        transport_log_debug!("Trying address: {:?}", addr);
127
128        match TcpStream::connect_timeout(&addr, connection_timeout) {
129            Ok(stream) => {
130                // These operations are best-effort and their failure is not critical for the connection itself.
131                // Errors are logged but not propagated to avoid disrupting the connection flow.
132                stream
133                    .set_read_timeout(Some(response_timeout))
134                    .unwrap_or_else(|e| transport_log_warn!("Failed to set read timeout: {:?}", e));
135                stream
136                    .set_write_timeout(Some(response_timeout))
137                    .unwrap_or_else(|e| {
138                        transport_log_warn!("Failed to set write timeout: {:?}", e)
139                    });
140                stream
141                    .set_nodelay(true)
142                    .unwrap_or_else(|e| transport_log_warn!("Failed to set no-delay: {:?}", e));
143
144                self.stream = Some(stream); // Store the connected stream
145                Ok(()) // Connection successful
146            }
147            Err(e) => {
148                transport_log_error!("Connect failed: {:?}", e);
149                Err(TransportError::ConnectionFailed) // Connection failed for this single address
150            }
151        }
152    }
153
154    /// Closes the active TCP connection.
155    ///
156    /// If no connection is active, this operation does nothing and returns `Ok(())`.
157    fn disconnect(&mut self) -> Result<(), Self::Error> {
158        // Taking the stream out of the Option will drop it,
159        // which in turn closes the underlying TCP connection.
160        if let Some(stream) = self.stream.take() {
161            drop(stream);
162        }
163        Ok(())
164    }
165
166    /// Sends a Modbus Application Data Unit (ADU) over the TCP connection.
167    ///
168    /// # Arguments
169    /// * `adu` - The byte slice representing the ADU to send.
170    ///
171    /// # Returns
172    /// `Ok(())` if the ADU is successfully sent, or an error otherwise.
173    fn send(&mut self, adu: &[u8]) -> Result<(), Self::Error> {
174        let stream = self
175            .stream
176            .as_mut()
177            .ok_or(TransportError::ConnectionClosed)?;
178
179        let result = stream.write_all(adu).and_then(|()| stream.flush());
180
181        if let Err(err) = result {
182            let transport_error = Self::map_io_error(err);
183            if transport_error == TransportError::ConnectionClosed {
184                self.stream = None;
185            }
186            return Err(transport_error);
187        }
188
189        Ok(())
190    }
191
192    /// Receives available Modbus bytes from the TCP connection.
193    ///
194    /// This implementation performs a single, non-blocking read operation. It retrieves
195    /// whatever bytes are currently available in the socket buffer up to the maximum ADU size.
196    /// The higher-level Modbus Client Services (`ClientServices::ingest_frame`) is responsible
197    /// for buffering and isolating the complete frames.
198    ///
199    /// # Returns
200    /// `Ok(Vec<u8, MAX_ADU_FRAME_LEN>)` containing the received bytes, or an error otherwise.
201    fn recv(&mut self) -> Result<Vec<u8, MAX_ADU_FRAME_LEN>, Self::Error> {
202        let stream = self
203            .stream
204            .as_mut()
205            .ok_or(TransportError::ConnectionClosed)?;
206
207        // Temporarily set the stream to non-blocking to immediately return available bytes
208        // without blocking the caller's polling loop.
209        let _ = stream.set_nonblocking(true);
210
211        let mut temp_buf = [0u8; MAX_ADU_FRAME_LEN];
212        let read_result = stream.read(&mut temp_buf);
213
214        // Restore the stream to blocking mode so that `send` (which uses `write_all`)
215        // continues to respect the configured timeouts without failing prematurely on a full buffer.
216        let _ = stream.set_nonblocking(false);
217
218        match read_result {
219            Ok(0) => {
220                // A read of 0 bytes indicates the peer gracefully closed the TCP connection.
221                self.stream = None;
222                Err(TransportError::ConnectionClosed)
223            }
224            Ok(n) => {
225                let mut buffer = Vec::new();
226                // Copy the read bytes into our heapless vector.
227                if buffer.extend_from_slice(&temp_buf[..n]).is_err() {
228                    return Err(TransportError::BufferTooSmall);
229                }
230                Ok(buffer)
231            }
232            Err(e) => {
233                let err = Self::map_io_error(e);
234                if err == TransportError::ConnectionClosed {
235                    self.stream = None;
236                }
237                // WouldBlock gets mapped to TransportError::Timeout, signaling no data is currently ready.
238                Err(err)
239            }
240        }
241    }
242
243    /// Checks if the transport is currently connected to a remote host.
244    ///
245    /// This is a best-effort check and indicates if a `TcpStream` is currently held.
246    fn is_connected(&self) -> bool {
247        self.stream.is_some()
248    }
249}
250
251#[cfg(test)]
252impl StdTcpTransport {
253    pub fn stream_mut(&mut self) -> Option<&mut TcpStream> {
254        self.stream.as_mut()
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::super::std_transport::StdTcpTransport;
261    use mbus_core::transport::{ModbusConfig, ModbusTcpConfig, Transport, TransportError};
262    use std::io::{self, Read, Write};
263    use std::net::TcpListener;
264    use std::sync::mpsc;
265    use std::thread;
266    use std::time::Duration;
267
268    /// Helper function to create a TcpListener on an available port.
269    /// This listener is then passed to the server thread.
270    fn create_test_listener() -> TcpListener {
271        TcpListener::bind("127.0.0.1:0").expect("Failed to bind to an available port")
272    }
273
274    /// Helper function to extract host and port from a SocketAddr.
275    fn get_host_port(addr: std::net::SocketAddr) -> u16 {
276        addr.port()
277    }
278
279    /// Test case: `StdTcpTransport::new` creates an instance with no active connection.
280    #[test]
281    fn test_new_std_tcp_transport() {
282        let transport = StdTcpTransport::new();
283        assert!(!transport.is_connected());
284    }
285
286    /// Test case: `connect` successfully establishes a TCP connection.
287    ///
288    /// A mock server is set up to accept a single connection.
289    #[test]
290    fn test_connect_success() {
291        let listener = create_test_listener();
292        let addr = listener.local_addr().unwrap();
293        let (tx, rx) = mpsc::channel();
294
295        let server_handle = thread::spawn(move || {
296            tx.send(()).expect("Failed to send server ready signal"); // Signal that the listener is ready
297            // Accept one connection and then close
298            let _ = listener.accept().unwrap();
299        });
300
301        rx.recv().expect("Failed to receive server ready signal"); // Wait for the server to be ready
302
303        let mut transport = StdTcpTransport::new();
304        let port = get_host_port(addr);
305        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
306        let result = transport.connect(&config);
307        assert!(result.is_ok());
308        assert!(transport.is_connected());
309
310        server_handle.join().unwrap();
311    }
312
313    /// Test case: `connect` fails with an invalid address string.
314    #[test]
315    fn test_connect_failure_invalid_addr() {
316        let mut transport = StdTcpTransport::new();
317        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("invalid-address", 502).unwrap()); // Invalid host, but short enough
318        let result = transport.connect(&config);
319        assert!(result.is_err());
320        assert_eq!(result.unwrap_err(), TransportError::ConnectionFailed);
321        assert!(!transport.is_connected());
322    }
323
324    /// Test case: `connect` fails when the server actively refuses the connection.
325    ///
326    /// This is simulated by trying to connect to a port where no server is listening.
327    #[test]
328    fn test_connect_failure_connection_refused() {
329        // We don't start a server, so the port will be refused
330        let listener = create_test_listener(); // Just to get an unused port
331        let port = listener.local_addr().unwrap().port();
332        drop(listener); // Explicitly drop the listener to ensure the port is free
333        let mut transport = StdTcpTransport::new();
334        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
335        let result = transport.connect(&config);
336        assert!(result.is_err());
337        assert_eq!(result.unwrap_err(), TransportError::ConnectionFailed);
338        assert!(!transport.is_connected());
339    }
340
341    /// Test case: `disconnect` closes an active connection.
342    #[test]
343    fn test_disconnect() {
344        let listener = create_test_listener();
345        let addr = listener.local_addr().unwrap();
346        let (tx, rx) = mpsc::channel();
347
348        let server_handle = thread::spawn(move || {
349            tx.send(()).expect("Failed to send server ready signal");
350            let _ = listener.accept().unwrap(); // Just accept and hold
351        });
352
353        rx.recv().expect("Failed to receive server ready signal");
354
355        let mut transport = StdTcpTransport::new();
356        let port = get_host_port(addr);
357        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
358        transport.connect(&config).unwrap();
359        assert!(transport.is_connected());
360
361        let result = transport.disconnect();
362        assert!(result.is_ok());
363        assert!(!transport.is_connected());
364
365        server_handle.join().unwrap();
366    }
367
368    /// Test case: `send` successfully transmits data over an active connection.
369    ///
370    /// A mock server receives the data and verifies it.
371    #[test]
372    fn test_send_success() {
373        let listener = create_test_listener();
374        let addr = listener.local_addr().unwrap();
375        let (tx, rx) = mpsc::channel();
376        let test_data = [0x01, 0x02, 0x03, 0x04];
377
378        let server_handle = thread::spawn(move || {
379            tx.send(()).expect("Failed to send server ready signal");
380            let (mut stream, _) = listener.accept().unwrap();
381            let mut buf = [0; 4];
382            stream.read_exact(&mut buf).unwrap();
383            assert_eq!(buf, test_data);
384        });
385
386        rx.recv().expect("Failed to receive server ready signal");
387
388        let mut transport = StdTcpTransport::new();
389        let port = get_host_port(addr);
390        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
391        transport.connect(&config).unwrap();
392
393        let result = transport.send(&test_data);
394        assert!(result.is_ok());
395
396        server_handle.join().unwrap();
397    }
398
399    /// Test case: `send` fails when the transport is not connected.
400    #[test]
401    fn test_send_failure_not_connected() {
402        let mut transport = StdTcpTransport::new();
403        let test_data = [0x01, 0x02];
404        let result = transport.send(&test_data);
405        assert!(result.is_err());
406        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
407    }
408
409    /// Test case: `recv` successfully receives a complete Modbus ADU.
410    ///
411    /// A mock server sends a predefined valid ADU.
412    #[test]
413    fn test_recv_success_full_adu() {
414        let listener = create_test_listener();
415        let addr = listener.local_addr().unwrap();
416        let (tx, rx) = mpsc::channel();
417        // Example ADU: TID=0x0001, PID=0x0000, Length=0x0003 (Unit ID + FC + 1 data byte), UnitID=0x01, FC=0x03, Data=0x00
418        let adu_to_send = [0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x01, 0x03, 0x00];
419
420        let server_handle = thread::spawn(move || {
421            tx.send(()).expect("Failed to send server ready signal");
422            let (mut stream, _) = listener.accept().unwrap();
423            stream.write_all(&adu_to_send).unwrap();
424        });
425
426        rx.recv().expect("Failed to receive server ready signal");
427
428        let mut transport = StdTcpTransport::new();
429        let port = get_host_port(addr);
430        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
431
432        transport.connect(&config).unwrap();
433
434        // The non-blocking receiver might fetch fragments if testing fast enough, so
435        // we buffer and await until the full transmission is verified in the test.
436        let mut combined_adu = std::vec::Vec::new();
437        for _ in 0..50 {
438            match transport.recv() {
439                Ok(bytes) => {
440                    combined_adu.extend_from_slice(&bytes);
441                    if combined_adu.len() == adu_to_send.len() {
442                        break;
443                    }
444                }
445                Err(TransportError::Timeout) => {
446                    std::thread::sleep(Duration::from_millis(10));
447                }
448                Err(e) => panic!("Unexpected error: {:?}", e),
449            }
450        }
451        assert_eq!(combined_adu.as_slice(), adu_to_send);
452
453        server_handle.join().unwrap();
454    }
455
456    /// Test case: `recv` fails when the transport is not connected.
457    #[test]
458    fn test_recv_failure_not_connected() {
459        let mut transport = StdTcpTransport::new();
460        let result = transport.recv();
461        assert!(result.is_err());
462        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
463    }
464
465    /// Test case: `recv` fails when the peer closes the connection prematurely during header read.
466    #[test]
467    fn test_recv_failure_connection_closed_prematurely_header() {
468        let listener = create_test_listener();
469        let addr = listener.local_addr().unwrap();
470        let (tx, rx) = mpsc::channel();
471        // Send only part of the MBAP header (e.g., 3 bytes instead of 7)
472        let partial_adu = [0x00, 0x01, 0x00];
473
474        let server_handle = thread::spawn(move || {
475            tx.send(()).expect("Failed to send server ready signal");
476            let (mut stream, _) = listener.accept().unwrap();
477            stream.write_all(&partial_adu).unwrap();
478            // Server closes connection after sending partial data
479        });
480
481        rx.recv().expect("Failed to receive server ready signal");
482
483        let mut transport = StdTcpTransport::new();
484        let port = get_host_port(addr);
485        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
486        transport.connect(&config).unwrap();
487
488        let mut result = transport.recv();
489        for _ in 0..50 {
490            if let Err(TransportError::Timeout) = result {
491                std::thread::sleep(Duration::from_millis(10));
492                result = transport.recv();
493            } else if let Ok(_) = result {
494                result = transport.recv();
495            } else {
496                break;
497            }
498        }
499        assert!(result.is_err());
500        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
501
502        server_handle.join().unwrap();
503    }
504
505    /// Test case: `recv` fails when the peer closes the connection prematurely after header but before full PDU.
506    #[test]
507    fn test_recv_failure_connection_closed_prematurely_pdu() {
508        let listener = create_test_listener();
509        let addr = listener.local_addr().unwrap();
510        let (tx, rx) = mpsc::channel();
511        // Valid MBAP header indicating a PDU length, but then send less than expected
512        // TID=0x0001, PID=0x0000, Length=0x0005 (Unit ID + FC + 3 data bytes), UnitID=0x01, FC=0x03
513        let partial_adu = [0x00, 0x01, 0x00, 0x00, 0x00, 0x05, 0x01, 0x03]; // 8 bytes sent, but 11 expected
514
515        let server_handle = thread::spawn(move || {
516            tx.send(()).expect("Failed to send server ready signal");
517            let (mut stream, _) = listener.accept().unwrap();
518            stream.write_all(&partial_adu).unwrap();
519            // Server closes connection after sending partial PDU data
520        });
521
522        rx.recv().expect("Failed to receive server ready signal");
523
524        let mut transport = StdTcpTransport::new();
525        let port = get_host_port(addr);
526        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
527        transport.connect(&config).unwrap();
528
529        let mut result = transport.recv();
530        for _ in 0..50 {
531            if let Err(TransportError::Timeout) = result {
532                std::thread::sleep(Duration::from_millis(10));
533                result = transport.recv();
534            } else if let Ok(_) = result {
535                result = transport.recv();
536            } else {
537                break;
538            }
539        }
540        assert!(result.is_err());
541        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
542
543        server_handle.join().unwrap();
544    }
545
546    /// Test case: `recv` times out if no data is received within the specified duration.
547    #[test]
548    fn test_recv_timeout() {
549        let listener = create_test_listener();
550        let addr = listener.local_addr().unwrap();
551        let (tx, rx) = mpsc::channel();
552
553        let server_handle = thread::spawn(move || {
554            tx.send(()).expect("Failed to send server ready signal");
555            let (_stream, _) = listener.accept().unwrap();
556            // Server accepts connection but sends no data, causing client to timeout
557            thread::sleep(Duration::from_secs(5)); // Ensure client times out first
558        });
559
560        rx.recv().expect("Failed to receive server ready signal");
561
562        let mut transport = StdTcpTransport::new(); // Very short timeout for test
563        let port = get_host_port(addr);
564        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
565        tcp_config.response_timeout_ms = 100; // Set short response timeout for test
566        let config = ModbusConfig::Tcp(tcp_config);
567        transport.connect(&config).unwrap();
568
569        let result = transport.recv();
570        assert!(result.is_err());
571        assert_eq!(result.unwrap_err(), TransportError::Timeout);
572
573        server_handle.join().unwrap();
574    }
575
576    /// Test case: `is_connected` returns true when connected and false when disconnected.
577    #[test]
578    fn test_is_connected() {
579        let listener = create_test_listener();
580        let addr = listener.local_addr().unwrap();
581        let (tx, rx) = mpsc::channel();
582
583        let server_handle = thread::spawn(move || {
584            tx.send(()).expect("Failed to send server ready signal");
585            let (_stream, _) = listener.accept().unwrap();
586            thread::sleep(Duration::from_millis(500)); // Keep connection open briefly
587        });
588
589        rx.recv().expect("Failed to receive server ready signal");
590
591        let mut transport = StdTcpTransport::new();
592        let port = get_host_port(addr);
593        assert!(!transport.is_connected());
594
595        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
596        transport.connect(&config).unwrap();
597
598        assert!(transport.is_connected());
599
600        transport.disconnect().unwrap();
601        assert!(!transport.is_connected());
602
603        server_handle.join().unwrap();
604    }
605
606    /// Test case: `map_io_error` correctly maps various `io::Error` kinds to `TransportError`.
607    #[test]
608    fn test_map_io_error() {
609        // ConnectionRefused
610        let err = io::Error::new(io::ErrorKind::ConnectionRefused, "test");
611        assert_eq!(
612            StdTcpTransport::map_io_error(err),
613            TransportError::ConnectionFailed
614        );
615
616        // NotFound (often used for address resolution issues)
617        let err = io::Error::new(io::ErrorKind::NotFound, "test");
618        assert_eq!(
619            StdTcpTransport::map_io_error(err),
620            TransportError::ConnectionFailed
621        );
622
623        // BrokenPipe
624        let err = io::Error::new(io::ErrorKind::BrokenPipe, "test");
625        assert_eq!(
626            StdTcpTransport::map_io_error(err),
627            TransportError::ConnectionClosed
628        );
629
630        // ConnectionReset
631        let err = io::Error::new(io::ErrorKind::ConnectionReset, "test");
632        assert_eq!(
633            StdTcpTransport::map_io_error(err),
634            TransportError::ConnectionClosed
635        );
636
637        // UnexpectedEof
638        let err = io::Error::new(io::ErrorKind::UnexpectedEof, "test");
639        assert_eq!(
640            StdTcpTransport::map_io_error(err),
641            TransportError::ConnectionClosed
642        );
643
644        // WouldBlock
645        let err = io::Error::new(io::ErrorKind::WouldBlock, "test");
646        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::Timeout);
647
648        // TimedOut
649        let err = io::Error::new(io::ErrorKind::TimedOut, "test");
650        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::Timeout);
651
652        // Other I/O errors
653        let err = io::Error::new(io::ErrorKind::PermissionDenied, "test");
654        assert_eq!(StdTcpTransport::map_io_error(err), TransportError::IoError);
655    }
656
657    /// Test case: `connect` with a custom timeout.
658    #[test]
659    fn test_connect_with_custom_timeout() {
660        let listener = create_test_listener();
661        let addr = listener.local_addr().unwrap();
662        let (tx, rx) = mpsc::channel();
663
664        let server_handle = thread::spawn(move || {
665            tx.send(()).expect("Failed to send server ready signal");
666            let _ = listener.accept().unwrap();
667        });
668
669        rx.recv().expect("Failed to receive server ready signal");
670
671        let mut transport = StdTcpTransport::new(); // Custom timeout
672        let port = get_host_port(addr);
673        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
674        tcp_config.connection_timeout_ms = 500; // Set custom connection timeout for test
675        let config = ModbusConfig::Tcp(tcp_config);
676        let result = transport.connect(&config);
677        assert!(result.is_ok());
678        assert!(transport.is_connected());
679
680        server_handle.join().unwrap();
681    }
682
683    /// Test case: `connect` with no timeout specified (uses default).
684    #[test]
685    fn test_connect_with_no_timeout() {
686        let listener = create_test_listener();
687        let addr = listener.local_addr().unwrap();
688        let (tx, rx) = mpsc::channel();
689
690        let server_handle = thread::spawn(move || {
691            tx.send(()).expect("Failed to send server ready signal");
692            let _ = listener.accept().unwrap();
693        });
694
695        rx.recv().expect("Failed to receive server ready signal");
696
697        let mut transport = StdTcpTransport::new(); // No timeout
698        let port = get_host_port(addr);
699        let mut tcp_config = ModbusTcpConfig::new("127.0.0.1", port).unwrap();
700        tcp_config.connection_timeout_ms = 500; // No timeout
701        let config = ModbusConfig::Tcp(tcp_config);
702        let result = transport.connect(&config);
703        assert!(result.is_ok());
704        assert!(transport.is_connected());
705
706        server_handle.join().unwrap();
707    }
708
709    /// Test case: `send` fails if the connection is reset by the peer.
710    #[test]
711    fn test_send_failure_connection_reset() {
712        let listener = create_test_listener();
713        let addr = listener.local_addr().unwrap();
714        let (tx, rx) = mpsc::channel();
715        let test_data = [0x01, 0x02, 0x03, 0x04];
716
717        let server_handle = thread::spawn(move || {
718            tx.send(()).expect("Failed to send server ready signal");
719            let (stream, _) = listener.accept().unwrap();
720            drop(stream); // Immediately close the stream after accepting
721        });
722
723        rx.recv().expect("Failed to receive server ready signal");
724
725        let mut transport = StdTcpTransport::new();
726        let port = get_host_port(addr);
727        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", port).unwrap());
728
729        transport.connect(&config).unwrap();
730
731        assert!(transport.is_connected());
732
733        // Attempt a receive operation to force the client's TcpStream to detect the peer's closure.
734        // This should result in TransportError::ConnectionClosed and update the transport's state.
735        let mut recv_result = transport.recv();
736        for _ in 0..50 {
737            if let Err(TransportError::Timeout) = recv_result {
738                std::thread::sleep(Duration::from_millis(10));
739                recv_result = transport.recv();
740            } else {
741                break;
742            }
743        }
744        assert!(recv_result.is_err());
745        assert_eq!(recv_result.unwrap_err(), TransportError::ConnectionClosed);
746        // Now, the transport should report as disconnected.
747        assert!(!transport.is_connected());
748
749        // A subsequent send operation should now reliably fail with ConnectionClosed.
750        let result = transport.send(&test_data);
751        assert!(result.is_err());
752        assert_eq!(result.unwrap_err(), TransportError::ConnectionClosed);
753
754        server_handle.join().unwrap();
755    }
756
757    /// Test case: `connect` successfully establishes a TCP connection to a single, valid address.
758    #[test]
759    fn test_connect_success_single_addr() {
760        let listener = create_test_listener();
761        let addr = listener.local_addr().unwrap();
762        let (tx, rx) = mpsc::channel();
763
764        // Server for the successful connection
765        let server_handle = thread::spawn(move || {
766            tx.send(()).expect("Failed to send server ready signal");
767            let _ = listener.accept().unwrap(); // Just accept and hold
768        });
769
770        rx.recv().expect("Failed to receive server ready signal");
771
772        let mut transport = StdTcpTransport::new();
773        let config = ModbusConfig::Tcp(ModbusTcpConfig::new("127.0.0.1", addr.port()).unwrap());
774
775        let result = transport.connect(&config);
776        assert!(
777            result.is_ok(),
778            "Connection should succeed with a single address"
779        );
780        assert!(transport.is_connected());
781
782        server_handle.join().unwrap();
783    }
784}