1use std::io::{self, Read, Write};
6use std::net::{TcpStream, ToSocketAddrs};
7use std::os::unix::io::AsRawFd;
8use std::sync::{Arc, Mutex};
9use std::thread;
10use std::time::Duration;
11
12use rns_core::transport::types::InterfaceId;
13
14use crate::event::{Event, EventSender};
15use crate::hdlc;
16use crate::interface::{lock_or_recover, Writer};
17
18#[derive(Debug, Clone)]
20pub struct TcpClientConfig {
21 pub name: String,
22 pub target_host: String,
23 pub target_port: u16,
24 pub interface_id: InterfaceId,
25 pub reconnect_wait: Duration,
26 pub max_reconnect_tries: Option<u32>,
27 pub connect_timeout: Duration,
28 pub device: Option<String>,
30 pub runtime: Arc<Mutex<TcpClientRuntime>>,
31}
32
33#[derive(Debug, Clone)]
34pub struct TcpClientRuntime {
35 pub target_host: String,
36 pub target_port: u16,
37 pub reconnect_wait: Duration,
38 pub max_reconnect_tries: Option<u32>,
39 pub connect_timeout: Duration,
40}
41
42impl TcpClientRuntime {
43 pub fn from_config(config: &TcpClientConfig) -> Self {
44 Self {
45 target_host: config.target_host.clone(),
46 target_port: config.target_port,
47 reconnect_wait: config.reconnect_wait,
48 max_reconnect_tries: config.max_reconnect_tries,
49 connect_timeout: config.connect_timeout,
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
55pub struct TcpClientRuntimeConfigHandle {
56 pub interface_name: String,
57 pub runtime: Arc<Mutex<TcpClientRuntime>>,
58 pub startup: TcpClientRuntime,
59}
60
61impl Default for TcpClientConfig {
62 fn default() -> Self {
63 let mut config = TcpClientConfig {
64 name: String::new(),
65 target_host: "127.0.0.1".into(),
66 target_port: 4242,
67 interface_id: InterfaceId(0),
68 reconnect_wait: Duration::from_secs(5),
69 max_reconnect_tries: None,
70 connect_timeout: Duration::from_secs(5),
71 device: None,
72 runtime: Arc::new(Mutex::new(TcpClientRuntime {
73 target_host: "127.0.0.1".into(),
74 target_port: 4242,
75 reconnect_wait: Duration::from_secs(5),
76 max_reconnect_tries: None,
77 connect_timeout: Duration::from_secs(5),
78 })),
79 };
80 let startup = TcpClientRuntime::from_config(&config);
81 config.runtime = Arc::new(Mutex::new(startup));
82 config
83 }
84}
85
86struct TcpWriter {
88 stream: TcpStream,
89}
90
91impl Writer for TcpWriter {
92 fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
93 self.stream.write_all(&hdlc::frame(data))
94 }
95}
96
97fn set_socket_options(stream: &TcpStream) -> io::Result<()> {
99 let fd = stream.as_raw_fd();
100 unsafe {
101 let val: libc::c_int = 1;
103 if libc::setsockopt(
104 fd,
105 libc::IPPROTO_TCP,
106 libc::TCP_NODELAY,
107 &val as *const _ as *const libc::c_void,
108 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
109 ) != 0
110 {
111 return Err(io::Error::last_os_error());
112 }
113
114 if libc::setsockopt(
116 fd,
117 libc::SOL_SOCKET,
118 libc::SO_KEEPALIVE,
119 &val as *const _ as *const libc::c_void,
120 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
121 ) != 0
122 {
123 return Err(io::Error::last_os_error());
124 }
125
126 #[cfg(target_os = "linux")]
128 {
129 let idle: libc::c_int = 5;
131 if libc::setsockopt(
132 fd,
133 libc::IPPROTO_TCP,
134 libc::TCP_KEEPIDLE,
135 &idle as *const _ as *const libc::c_void,
136 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
137 ) != 0
138 {
139 return Err(io::Error::last_os_error());
140 }
141
142 let intvl: libc::c_int = 2;
144 if libc::setsockopt(
145 fd,
146 libc::IPPROTO_TCP,
147 libc::TCP_KEEPINTVL,
148 &intvl as *const _ as *const libc::c_void,
149 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
150 ) != 0
151 {
152 return Err(io::Error::last_os_error());
153 }
154
155 let cnt: libc::c_int = 12;
157 if libc::setsockopt(
158 fd,
159 libc::IPPROTO_TCP,
160 libc::TCP_KEEPCNT,
161 &cnt as *const _ as *const libc::c_void,
162 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
163 ) != 0
164 {
165 return Err(io::Error::last_os_error());
166 }
167
168 let timeout: libc::c_int = 24_000;
170 if libc::setsockopt(
171 fd,
172 libc::IPPROTO_TCP,
173 libc::TCP_USER_TIMEOUT,
174 &timeout as *const _ as *const libc::c_void,
175 std::mem::size_of::<libc::c_int>() as libc::socklen_t,
176 ) != 0
177 {
178 return Err(io::Error::last_os_error());
179 }
180 }
181 }
182 Ok(())
183}
184
185fn try_connect(config: &TcpClientConfig) -> io::Result<TcpStream> {
187 let runtime = lock_or_recover(&config.runtime, "tcp client runtime").clone();
188 let addr_str = format!("{}:{}", config.target_host, config.target_port);
189 let addr = addr_str
190 .to_socket_addrs()?
191 .next()
192 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "no addresses resolved"))?;
193
194 #[cfg(target_os = "linux")]
195 let stream = if let Some(ref device) = config.device {
196 connect_with_device(&addr, device, runtime.connect_timeout)?
197 } else {
198 TcpStream::connect_timeout(&addr, runtime.connect_timeout)?
199 };
200 #[cfg(not(target_os = "linux"))]
201 let stream = TcpStream::connect_timeout(&addr, runtime.connect_timeout)?;
202 set_socket_options(&stream)?;
203 Ok(stream)
204}
205
206#[cfg(target_os = "linux")]
208fn connect_with_device(
209 addr: &std::net::SocketAddr,
210 device: &str,
211 timeout: Duration,
212) -> io::Result<TcpStream> {
213 use std::os::unix::io::{FromRawFd, RawFd};
214
215 let domain = if addr.is_ipv4() {
216 libc::AF_INET
217 } else {
218 libc::AF_INET6
219 };
220 let fd: RawFd = unsafe { libc::socket(domain, libc::SOCK_STREAM, 0) };
221 if fd < 0 {
222 return Err(io::Error::last_os_error());
223 }
224
225 let stream = unsafe { TcpStream::from_raw_fd(fd) };
227
228 super::bind_to_device(stream.as_raw_fd(), device)?;
229
230 stream.set_nonblocking(true)?;
232
233 let (sockaddr, socklen) = socket_addr_to_raw(addr);
234 let ret = unsafe {
235 libc::connect(
236 stream.as_raw_fd(),
237 &sockaddr as *const libc::sockaddr_storage as *const libc::sockaddr,
238 socklen,
239 )
240 };
241
242 if ret != 0 {
243 let err = io::Error::last_os_error();
244 if err.raw_os_error() != Some(libc::EINPROGRESS) {
245 return Err(err);
246 }
247 }
248
249 let mut pollfd = libc::pollfd {
251 fd: stream.as_raw_fd(),
252 events: libc::POLLOUT,
253 revents: 0,
254 };
255 let timeout_ms = timeout.as_millis() as libc::c_int;
256 let poll_ret = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
257
258 if poll_ret == 0 {
259 return Err(io::Error::new(io::ErrorKind::TimedOut, "connect timed out"));
260 }
261 if poll_ret < 0 {
262 return Err(io::Error::last_os_error());
263 }
264
265 let mut err_val: libc::c_int = 0;
267 let mut err_len: libc::socklen_t = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
268 let ret = unsafe {
269 libc::getsockopt(
270 stream.as_raw_fd(),
271 libc::SOL_SOCKET,
272 libc::SO_ERROR,
273 &mut err_val as *mut _ as *mut libc::c_void,
274 &mut err_len,
275 )
276 };
277 if ret != 0 {
278 return Err(io::Error::last_os_error());
279 }
280 if err_val != 0 {
281 return Err(io::Error::from_raw_os_error(err_val));
282 }
283
284 stream.set_nonblocking(false)?;
286
287 Ok(stream)
288}
289
290#[cfg(target_os = "linux")]
292fn socket_addr_to_raw(addr: &std::net::SocketAddr) -> (libc::sockaddr_storage, libc::socklen_t) {
293 let mut storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
294 match addr {
295 std::net::SocketAddr::V4(v4) => {
296 let sin: &mut libc::sockaddr_in = unsafe {
297 &mut *(&mut storage as *mut libc::sockaddr_storage as *mut libc::sockaddr_in)
298 };
299 sin.sin_family = libc::AF_INET as libc::sa_family_t;
300 sin.sin_port = v4.port().to_be();
301 sin.sin_addr = libc::in_addr {
302 s_addr: u32::from_ne_bytes(v4.ip().octets()),
303 };
304 (
305 storage,
306 std::mem::size_of::<libc::sockaddr_in>() as libc::socklen_t,
307 )
308 }
309 std::net::SocketAddr::V6(v6) => {
310 let sin6: &mut libc::sockaddr_in6 = unsafe {
311 &mut *(&mut storage as *mut libc::sockaddr_storage as *mut libc::sockaddr_in6)
312 };
313 sin6.sin6_family = libc::AF_INET6 as libc::sa_family_t;
314 sin6.sin6_port = v6.port().to_be();
315 sin6.sin6_addr = libc::in6_addr {
316 s6_addr: v6.ip().octets(),
317 };
318 sin6.sin6_flowinfo = v6.flowinfo();
319 sin6.sin6_scope_id = v6.scope_id();
320 (
321 storage,
322 std::mem::size_of::<libc::sockaddr_in6>() as libc::socklen_t,
323 )
324 }
325 }
326}
327
328pub fn start(config: TcpClientConfig, tx: EventSender) -> io::Result<Box<dyn Writer>> {
330 let stream = try_connect(&config)?;
331 let reader_stream = stream.try_clone()?;
332 let writer_stream = stream.try_clone()?;
333
334 let id = config.interface_id;
335 let _ = tx.send(Event::InterfaceUp(id, None, None));
337
338 let reader_config = config;
340 let reader_tx = tx;
341 thread::Builder::new()
342 .name(format!("tcp-reader-{}", id.0))
343 .spawn(move || {
344 reader_loop(reader_stream, reader_config, reader_tx);
345 })?;
346
347 Ok(Box::new(TcpWriter {
348 stream: writer_stream,
349 }))
350}
351
352fn reader_loop(mut stream: TcpStream, config: TcpClientConfig, tx: EventSender) {
355 let id = config.interface_id;
356 let mut decoder = hdlc::Decoder::new();
357 let mut buf = [0u8; 4096];
358
359 loop {
360 match stream.read(&mut buf) {
361 Ok(0) => {
362 log::warn!("[{}] connection closed", config.name);
364 let _ = tx.send(Event::InterfaceDown(id));
365 match reconnect(&config, &tx) {
366 Some(new_stream) => {
367 stream = new_stream;
368 decoder = hdlc::Decoder::new();
369 continue;
370 }
371 None => {
372 log::error!("[{}] reconnection failed, giving up", config.name);
373 return;
374 }
375 }
376 }
377 Ok(n) => {
378 for frame in decoder.feed(&buf[..n]) {
379 if tx
380 .send(Event::Frame {
381 interface_id: id,
382 data: frame,
383 rssi: None,
384 snr: None,
385 })
386 .is_err()
387 {
388 return;
390 }
391 }
392 }
393 Err(e) => {
394 log::warn!("[{}] read error: {}", config.name, e);
395 let _ = tx.send(Event::InterfaceDown(id));
396 match reconnect(&config, &tx) {
397 Some(new_stream) => {
398 stream = new_stream;
399 decoder = hdlc::Decoder::new();
400 continue;
401 }
402 None => {
403 log::error!("[{}] reconnection failed, giving up", config.name);
404 return;
405 }
406 }
407 }
408 }
409 }
410}
411
412fn reconnect(config: &TcpClientConfig, tx: &EventSender) -> Option<TcpStream> {
415 let mut attempts = 0u32;
416 loop {
417 let runtime = lock_or_recover(&config.runtime, "tcp client runtime").clone();
418 thread::sleep(runtime.reconnect_wait);
419 attempts += 1;
420
421 if let Some(max) = runtime.max_reconnect_tries {
422 if attempts > max {
423 let _ = tx.send(Event::InterfaceDown(config.interface_id));
424 return None;
425 }
426 }
427
428 log::info!("[{}] reconnect attempt {} ...", config.name, attempts);
429
430 match try_connect(config) {
431 Ok(new_stream) => {
432 let writer_stream = match new_stream.try_clone() {
434 Ok(s) => s,
435 Err(e) => {
436 log::warn!("[{}] failed to clone stream: {}", config.name, e);
437 continue;
438 }
439 };
440 log::info!("[{}] reconnected", config.name);
441 let new_writer: Box<dyn Writer> = Box::new(TcpWriter {
443 stream: writer_stream,
444 });
445 let _ = tx.send(Event::InterfaceUp(
446 config.interface_id,
447 Some(new_writer),
448 None,
449 ));
450 return Some(new_stream);
451 }
452 Err(e) => {
453 log::warn!("[{}] reconnect failed: {}", config.name, e);
454 }
455 }
456 }
457}
458
459use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
462use rns_core::transport::types::InterfaceInfo;
463use std::collections::HashMap;
464
465pub struct TcpClientFactory;
467
468impl InterfaceFactory for TcpClientFactory {
469 fn type_name(&self) -> &str {
470 "TCPClientInterface"
471 }
472
473 fn parse_config(
474 &self,
475 name: &str,
476 id: InterfaceId,
477 params: &HashMap<String, String>,
478 ) -> Result<Box<dyn InterfaceConfigData>, String> {
479 let target_host = params
480 .get("target_host")
481 .cloned()
482 .unwrap_or_else(|| "127.0.0.1".into());
483 let target_port = params
484 .get("target_port")
485 .and_then(|v| v.parse().ok())
486 .unwrap_or(4242);
487
488 Ok(Box::new(TcpClientConfig {
489 name: name.to_string(),
490 target_host,
491 target_port,
492 interface_id: id,
493 device: params.get("device").cloned(),
494 ..TcpClientConfig::default()
495 }))
496 }
497
498 fn start(
499 &self,
500 config: Box<dyn InterfaceConfigData>,
501 ctx: StartContext,
502 ) -> io::Result<StartResult> {
503 let tcp_config = *config
504 .into_any()
505 .downcast::<TcpClientConfig>()
506 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "wrong config type"))?;
507
508 let id = tcp_config.interface_id;
509 let name = tcp_config.name.clone();
510 let info = InterfaceInfo {
511 id,
512 name,
513 mode: ctx.mode,
514 out_capable: true,
515 in_capable: true,
516 bitrate: None,
517 airtime_profile: None,
518 announce_rate_target: None,
519 announce_rate_grace: 0,
520 announce_rate_penalty: 0.0,
521 announce_cap: rns_core::constants::ANNOUNCE_CAP,
522 is_local_client: false,
523 wants_tunnel: false,
524 tunnel_id: None,
525 mtu: 65535,
526 ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
527 ia_freq: 0.0,
528 ip_freq: 0.0,
529 op_freq: 0.0,
530 op_samples: 0,
531 started: crate::time::now(),
532 };
533
534 let writer = start(tcp_config, ctx.tx)?;
535
536 Ok(StartResult::Simple {
537 id,
538 info,
539 writer,
540 interface_type_name: "TCPClientInterface".to_string(),
541 })
542 }
543}
544
545pub(crate) fn tcp_client_runtime_handle_from_config(
546 config: &TcpClientConfig,
547) -> TcpClientRuntimeConfigHandle {
548 TcpClientRuntimeConfigHandle {
549 interface_name: config.name.clone(),
550 runtime: Arc::clone(&config.runtime),
551 startup: TcpClientRuntime::from_config(config),
552 }
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558 use std::net::TcpListener;
559 use std::time::Duration;
560
561 fn find_free_port() -> u16 {
562 TcpListener::bind("127.0.0.1:0")
563 .unwrap()
564 .local_addr()
565 .unwrap()
566 .port()
567 }
568
569 fn make_config(port: u16) -> TcpClientConfig {
570 TcpClientConfig {
571 name: format!("test-tcp-{}", port),
572 target_host: "127.0.0.1".into(),
573 target_port: port,
574 interface_id: InterfaceId(1),
575 reconnect_wait: Duration::from_millis(100),
576 max_reconnect_tries: Some(2),
577 connect_timeout: Duration::from_secs(2),
578 runtime: Arc::new(Mutex::new(TcpClientRuntime {
579 target_host: "127.0.0.1".into(),
580 target_port: port,
581 reconnect_wait: Duration::from_millis(100),
582 max_reconnect_tries: Some(2),
583 connect_timeout: Duration::from_secs(2),
584 })),
585 device: None,
586 }
587 }
588
589 #[test]
590 fn connect_to_listener() {
591 let port = find_free_port();
592 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
593 let (tx, rx) = crate::event::channel();
594
595 let config = make_config(port);
596 let _writer = start(config, tx).unwrap();
597
598 let _server_stream = listener.accept().unwrap();
600
601 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
603 assert!(matches!(event, Event::InterfaceUp(InterfaceId(1), _, _)));
604 }
605
606 #[test]
607 fn receive_frame() {
608 let port = find_free_port();
609 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
610 let (tx, rx) = crate::event::channel();
611
612 let config = make_config(port);
613 let _writer = start(config, tx).unwrap();
614
615 let (mut server_stream, _) = listener.accept().unwrap();
616
617 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
619
620 let payload: Vec<u8> = (0..32).collect();
622 let framed = hdlc::frame(&payload);
623 server_stream.write_all(&framed).unwrap();
624
625 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
627 match event {
628 Event::Frame {
629 interface_id,
630 data,
631 rssi: _,
632 snr: _,
633 } => {
634 assert_eq!(interface_id, InterfaceId(1));
635 assert_eq!(data, payload);
636 }
637 other => panic!("expected Frame, got {:?}", other),
638 }
639 }
640
641 #[test]
642 fn send_frame() {
643 let port = find_free_port();
644 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
645 let (tx, _rx) = crate::event::channel();
646
647 let config = make_config(port);
648 let mut writer = start(config, tx).unwrap();
649
650 let (mut server_stream, _) = listener.accept().unwrap();
651 server_stream
652 .set_read_timeout(Some(Duration::from_secs(2)))
653 .unwrap();
654
655 let payload: Vec<u8> = (0..24).collect();
657 writer.send_frame(&payload).unwrap();
658
659 let mut buf = [0u8; 256];
661 let n = server_stream.read(&mut buf).unwrap();
662 let expected = hdlc::frame(&payload);
663 assert_eq!(&buf[..n], &expected[..]);
664 }
665
666 #[test]
667 fn multiple_frames() {
668 let port = find_free_port();
669 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
670 let (tx, rx) = crate::event::channel();
671
672 let config = make_config(port);
673 let _writer = start(config, tx).unwrap();
674
675 let (mut server_stream, _) = listener.accept().unwrap();
676
677 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
679
680 let payloads: Vec<Vec<u8>> = (0..3)
682 .map(|i| (0..24).map(|j| j + i * 50).collect())
683 .collect();
684 for p in &payloads {
685 server_stream.write_all(&hdlc::frame(p)).unwrap();
686 }
687
688 for expected in &payloads {
690 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
691 match event {
692 Event::Frame { data, .. } => assert_eq!(&data, expected),
693 other => panic!("expected Frame, got {:?}", other),
694 }
695 }
696 }
697
698 #[test]
699 fn split_across_reads() {
700 let port = find_free_port();
701 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
702 let (tx, rx) = crate::event::channel();
703
704 let config = make_config(port);
705 let _writer = start(config, tx).unwrap();
706
707 let (mut server_stream, _) = listener.accept().unwrap();
708
709 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
711
712 let payload: Vec<u8> = (0..32).collect();
714 let framed = hdlc::frame(&payload);
715 let mid = framed.len() / 2;
716
717 server_stream.write_all(&framed[..mid]).unwrap();
718 server_stream.flush().unwrap();
719 thread::sleep(Duration::from_millis(50));
720 server_stream.write_all(&framed[mid..]).unwrap();
721 server_stream.flush().unwrap();
722
723 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
724 match event {
725 Event::Frame { data, .. } => assert_eq!(data, payload),
726 other => panic!("expected Frame, got {:?}", other),
727 }
728 }
729
730 #[test]
731 fn reconnect_on_close() {
732 let port = find_free_port();
733 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
734 listener.set_nonblocking(false).unwrap();
735 let (tx, rx) = crate::event::channel();
736
737 let config = make_config(port);
738 let _writer = start(config, tx).unwrap();
739
740 let (server_stream, _) = listener.accept().unwrap();
742
743 let _ = rx.recv_timeout(Duration::from_secs(1)).unwrap();
745
746 drop(server_stream);
747
748 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
750 assert!(matches!(event, Event::InterfaceDown(InterfaceId(1))));
751
752 let _server_stream2 = listener.accept().unwrap();
754
755 let event = rx.recv_timeout(Duration::from_secs(2)).unwrap();
757 assert!(matches!(event, Event::InterfaceUp(InterfaceId(1), _, _)));
758 }
759
760 #[test]
761 fn socket_options() {
762 let port = find_free_port();
763 let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap();
764
765 let stream = try_connect(&make_config(port)).unwrap();
766 let _server = listener.accept().unwrap();
767
768 let fd = stream.as_raw_fd();
770 let mut val: libc::c_int = 0;
771 let mut len: libc::socklen_t = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
772 unsafe {
773 libc::getsockopt(
774 fd,
775 libc::IPPROTO_TCP,
776 libc::TCP_NODELAY,
777 &mut val as *mut _ as *mut libc::c_void,
778 &mut len,
779 );
780 }
781 assert_eq!(val, 1, "TCP_NODELAY should be 1");
782 }
783
784 #[test]
785 fn connect_timeout() {
786 let config = TcpClientConfig {
788 name: "timeout-test".into(),
789 target_host: "192.0.2.1".into(), target_port: 12345,
791 interface_id: InterfaceId(99),
792 reconnect_wait: Duration::from_millis(100),
793 max_reconnect_tries: Some(0),
794 connect_timeout: Duration::from_millis(500),
795 device: None,
796 runtime: Arc::new(Mutex::new(TcpClientRuntime {
797 target_host: "192.0.2.1".into(),
798 target_port: 12345,
799 reconnect_wait: Duration::from_millis(100),
800 max_reconnect_tries: Some(0),
801 connect_timeout: Duration::from_millis(500),
802 })),
803 ..TcpClientConfig::default()
804 };
805
806 let start_time = std::time::Instant::now();
807 let result = try_connect(&config);
808 let elapsed = start_time.elapsed();
809
810 assert!(result.is_err());
811 assert!(elapsed < Duration::from_secs(5));
813 }
814}