1mod ip_echo_client;
3mod ip_echo_server;
4pub mod sockets;
5
6#[cfg(feature = "dev-context-only-utils")]
7pub mod tooling_for_tests;
8
9pub use ip_echo_server::{
10 ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
11 MINIMUM_IP_ECHO_SERVER_THREADS,
12};
13#[cfg(feature = "dev-context-only-utils")]
14use tokio::net::UdpSocket as TokioUdpSocket;
15use {
16 ip_echo_client::{ip_echo_server_request, ip_echo_server_request_with_binding},
17 ip_echo_server::IpEchoServerMessage,
18 log::*,
19 rand::{thread_rng, Rng},
20 socket2::{Domain, SockAddr, Socket, Type},
21 std::{
22 io::{self},
23 net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, ToSocketAddrs, UdpSocket},
24 },
25 url::Url,
26};
27
28pub struct UdpSocketPair {
30 pub addr: SocketAddr, pub receiver: UdpSocket, pub sender: UdpSocket, }
34
35pub type PortRange = (u16, u16);
36
37pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
38pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; pub(crate) const HEADER_LENGTH: usize = 4;
41pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
42
43pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
46 let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
47 let rt = tokio::runtime::Builder::new_current_thread()
48 .enable_all()
49 .build()
50 .map_err(|e| e.to_string())?;
51 let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
52 Ok(resp.address)
53}
54
55pub fn get_public_ip_addr_with_binding(
58 ip_echo_server_addr: &SocketAddr,
59 bind_address: IpAddr,
60) -> anyhow::Result<IpAddr> {
61 let fut = ip_echo_server_request_with_binding(
62 *ip_echo_server_addr,
63 IpEchoServerMessage::default(),
64 bind_address,
65 );
66 let rt = tokio::runtime::Builder::new_current_thread()
67 .enable_all()
68 .build()?;
69 let resp = rt.block_on(fut)?;
70 Ok(resp.address)
71}
72
73pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
75 let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
76 let rt = tokio::runtime::Builder::new_current_thread()
77 .enable_all()
78 .build()
79 .map_err(|e| e.to_string())?;
80 let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
81 resp.shred_version
82 .ok_or_else(|| "IP echo server does not return a shred-version".to_owned())
83}
84
85pub fn get_cluster_shred_version_with_binding(
88 ip_echo_server_addr: &SocketAddr,
89 bind_address: IpAddr,
90) -> anyhow::Result<u16> {
91 let fut = ip_echo_server_request_with_binding(
92 *ip_echo_server_addr,
93 IpEchoServerMessage::default(),
94 bind_address,
95 );
96 let rt = tokio::runtime::Builder::new_current_thread()
97 .enable_all()
98 .build()?;
99 let resp = rt.block_on(fut)?;
100 resp.shred_version
101 .ok_or_else(|| anyhow::anyhow!("IP echo server does not return a shred-version"))
102}
103
104const MAX_PORT_VERIFY_THREADS: usize = 64;
107
108#[deprecated(
113 since = "2.2.0",
114 note = "use `verify_all_reachable_udp` and `verify_all_reachable_tcp` instead"
115)]
116pub fn verify_reachable_ports(
117 ip_echo_server_addr: &SocketAddr,
118 tcp_listeners: Vec<(u16, TcpListener)>,
119 udp_sockets: &[&UdpSocket],
120) -> bool {
121 verify_all_reachable_tcp(
122 ip_echo_server_addr,
123 tcp_listeners.into_iter().map(|(_, l)| l).collect(),
124 ) && verify_all_reachable_udp(ip_echo_server_addr, udp_sockets)
125}
126
127pub fn verify_all_reachable_udp(
132 ip_echo_server_addr: &SocketAddr,
133 udp_sockets: &[&UdpSocket],
134) -> bool {
135 let rt = tokio::runtime::Builder::new_current_thread()
136 .enable_all()
137 .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
138 .build()
139 .expect("Tokio builder should be able to reliably create a current thread runtime");
140 let fut = ip_echo_client::verify_all_reachable_udp(
141 *ip_echo_server_addr,
142 udp_sockets,
143 ip_echo_client::TIMEOUT,
144 ip_echo_client::DEFAULT_RETRY_COUNT,
145 );
146 rt.block_on(fut)
147}
148
149pub fn verify_all_reachable_tcp(
154 ip_echo_server_addr: &SocketAddr,
155 tcp_listeners: Vec<TcpListener>,
156) -> bool {
157 let rt = tokio::runtime::Builder::new_current_thread()
158 .enable_all()
159 .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
160 .build()
161 .expect("Tokio builder should be able to reliably create a current thread runtime");
162 let fut = ip_echo_client::verify_all_reachable_tcp(
163 *ip_echo_server_addr,
164 tcp_listeners,
165 ip_echo_client::TIMEOUT,
166 );
167 rt.block_on(fut)
168}
169
170pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
171 if let Some(addrstr) = optstr {
172 if let Ok(port) = addrstr.parse() {
173 let mut addr = default_addr;
174 addr.set_port(port);
175 addr
176 } else if let Ok(addr) = addrstr.parse() {
177 addr
178 } else {
179 default_addr
180 }
181 } else {
182 default_addr
183 }
184}
185
186pub fn parse_port_range(port_range: &str) -> Option<PortRange> {
187 let ports: Vec<&str> = port_range.split('-').collect();
188 if ports.len() != 2 {
189 return None;
190 }
191
192 let start_port = ports[0].parse();
193 let end_port = ports[1].parse();
194
195 if start_port.is_err() || end_port.is_err() {
196 return None;
197 }
198 let start_port = start_port.unwrap();
199 let end_port = end_port.unwrap();
200 if end_port < start_port {
201 return None;
202 }
203 Some((start_port, end_port))
204}
205
206pub fn parse_host(host: &str) -> Result<IpAddr, String> {
207 let parsed_url = Url::parse(&format!("http://{host}")).map_err(|e| e.to_string())?;
210 if parsed_url.port().is_some() {
211 return Err(format!("Expected port in URL: {host}"));
212 }
213
214 let ips: Vec<_> = (host, 0)
216 .to_socket_addrs()
217 .map_err(|err| err.to_string())?
218 .map(|socket_address| socket_address.ip())
219 .collect();
220 if ips.is_empty() {
221 Err(format!("Unable to resolve host: {host}"))
222 } else {
223 Ok(ips[0])
224 }
225}
226
227pub fn is_host(string: String) -> Result<(), String> {
228 parse_host(&string).map(|_| ())
229}
230
231pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
232 let addrs: Vec<_> = host_port
233 .to_socket_addrs()
234 .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
235 .collect();
236 if addrs.is_empty() {
237 Err(format!("Unable to resolve host: {host_port}"))
238 } else {
239 Ok(addrs[0])
240 }
241}
242
243pub fn is_host_port(string: String) -> Result<(), String> {
244 parse_host_port(&string).map(|_| ())
245}
246
247#[derive(Clone, Copy, Debug, Default)]
248pub struct SocketConfig {
249 reuseport: bool,
250 recv_buffer_size: Option<usize>,
251 send_buffer_size: Option<usize>,
252}
253
254impl SocketConfig {
255 pub fn reuseport(mut self, reuseport: bool) -> Self {
256 self.reuseport = reuseport;
257 self
258 }
259
260 pub fn recv_buffer_size(mut self, size: usize) -> Self {
267 self.recv_buffer_size = Some(size);
268 self
269 }
270
271 pub fn send_buffer_size(mut self, size: usize) -> Self {
278 self.send_buffer_size = Some(size);
279 self
280 }
281}
282
283#[cfg(any(windows, target_os = "ios"))]
284fn udp_socket_with_config(_config: SocketConfig) -> io::Result<Socket> {
285 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
286 Ok(sock)
287}
288
289#[cfg(not(any(windows, target_os = "ios")))]
290fn udp_socket_with_config(config: SocketConfig) -> io::Result<Socket> {
291 use nix::sys::socket::{setsockopt, sockopt::ReusePort};
292 let SocketConfig {
293 reuseport,
294 recv_buffer_size,
295 send_buffer_size,
296 } = config;
297
298 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
299
300 if let Some(recv_buffer_size) = recv_buffer_size {
302 sock.set_recv_buffer_size(recv_buffer_size)?;
303 }
304
305 if let Some(send_buffer_size) = send_buffer_size {
306 sock.set_send_buffer_size(send_buffer_size)?;
307 }
308
309 if reuseport {
310 setsockopt(&sock, ReusePort, &true).ok();
311 }
312
313 Ok(sock)
314}
315
316pub fn bind_common_in_range_with_config(
318 ip_addr: IpAddr,
319 range: PortRange,
320 config: SocketConfig,
321) -> io::Result<(u16, (UdpSocket, TcpListener))> {
322 for port in range.0..range.1 {
323 if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config) {
324 return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
325 }
326 }
327
328 Err(io::Error::other(format!(
329 "No available TCP/UDP ports in {range:?}"
330 )))
331}
332
333#[deprecated(
335 since = "2.2.0",
336 note = "use `bind_common_in_range_with_config` instead"
337)]
338pub fn bind_common_in_range(
339 ip_addr: IpAddr,
340 range: PortRange,
341) -> io::Result<(u16, (UdpSocket, TcpListener))> {
342 bind_common_in_range_with_config(ip_addr, range, SocketConfig::default())
343}
344
345pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
346 let config = SocketConfig::default();
347 bind_in_range_with_config(ip_addr, range, config)
348}
349
350pub fn bind_in_range_with_config(
351 ip_addr: IpAddr,
352 range: PortRange,
353 config: SocketConfig,
354) -> io::Result<(u16, UdpSocket)> {
355 let sock = udp_socket_with_config(config)?;
356
357 for port in range.0..range.1 {
358 let addr = SocketAddr::new(ip_addr, port);
359
360 if sock.bind(&SockAddr::from(addr)).is_ok() {
361 let sock: UdpSocket = sock.into();
362 return Result::Ok((sock.local_addr().unwrap().port(), sock));
363 }
364 }
365
366 Err(io::Error::other(format!(
367 "No available UDP ports in {range:?}"
368 )))
369}
370
371pub fn bind_with_any_port_with_config(
372 ip_addr: IpAddr,
373 config: SocketConfig,
374) -> io::Result<UdpSocket> {
375 let sock = udp_socket_with_config(config)?;
376 let addr = SocketAddr::new(ip_addr, 0);
377 match sock.bind(&SockAddr::from(addr)) {
378 Ok(_) => Result::Ok(sock.into()),
379 Err(err) => Err(io::Error::other(format!("No available UDP port: {err}"))),
380 }
381}
382
383#[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")]
384pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result<UdpSocket> {
385 bind_with_any_port_with_config(ip_addr, SocketConfig::default())
386}
387
388pub fn multi_bind_in_range_with_config(
390 ip_addr: IpAddr,
391 range: PortRange,
392 config: SocketConfig,
393 mut num: usize,
394) -> io::Result<(u16, Vec<UdpSocket>)> {
395 if !config.reuseport {
396 return Err(io::Error::new(
397 io::ErrorKind::InvalidInput,
398 "SocketConfig.reuseport must be true for multi_bind_in_range_with_config",
399 ));
400 }
401 if cfg!(windows) && num != 1 {
402 warn!(
404 "multi_bind_in_range_with_config() only supports 1 socket in windows ({} requested)",
405 num
406 );
407 num = 1;
408 }
409 let mut sockets = Vec::with_capacity(num);
410
411 const NUM_TRIES: usize = 100;
412 let mut port = 0;
413 let mut error = None;
414 for _ in 0..NUM_TRIES {
415 port = {
416 let (port, _) = bind_in_range(ip_addr, range)?;
417 port
418 }; for _ in 0..num {
421 let sock = bind_to_with_config(ip_addr, port, config);
422 if let Ok(sock) = sock {
423 sockets.push(sock);
424 } else {
425 error = Some(sock);
426 break;
427 }
428 }
429 if sockets.len() == num {
430 break;
431 } else {
432 sockets.clear();
433 }
434 }
435 if sockets.len() != num {
436 error.unwrap()?;
437 }
438 Ok((port, sockets))
439}
440
441#[deprecated(
444 since = "2.2.0",
445 note = "use `multi_bind_in_range_with_config` instead"
446)]
447#[allow(unused_mut)]
448pub fn multi_bind_in_range(
449 ip_addr: IpAddr,
450 range: PortRange,
451 mut num: usize,
452) -> io::Result<(u16, Vec<UdpSocket>)> {
453 let config = SocketConfig::default().reuseport(true);
454 multi_bind_in_range_with_config(ip_addr, range, config, num)
455}
456
457pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result<UdpSocket> {
458 let config = SocketConfig::default().reuseport(reuseport);
459 bind_to_with_config(ip_addr, port, config)
460}
461
462#[cfg(feature = "dev-context-only-utils")]
463pub async fn bind_to_async(
464 ip_addr: IpAddr,
465 port: u16,
466 reuseport: bool,
467) -> io::Result<TokioUdpSocket> {
468 let config = SocketConfig::default().reuseport(reuseport);
469 let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?;
470 TokioUdpSocket::from_std(socket)
471}
472
473pub fn bind_to_localhost() -> io::Result<UdpSocket> {
474 bind_to(
475 IpAddr::V4(Ipv4Addr::LOCALHOST),
476 0,
477 false,
478 )
479}
480
481#[cfg(feature = "dev-context-only-utils")]
482pub async fn bind_to_localhost_async() -> io::Result<TokioUdpSocket> {
483 bind_to_async(
484 IpAddr::V4(Ipv4Addr::LOCALHOST),
485 0,
486 false,
487 )
488 .await
489}
490
491pub fn bind_to_unspecified() -> io::Result<UdpSocket> {
492 bind_to(
493 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
494 0,
495 false,
496 )
497}
498
499#[cfg(feature = "dev-context-only-utils")]
500pub async fn bind_to_unspecified_async() -> io::Result<TokioUdpSocket> {
501 bind_to_async(
502 IpAddr::V4(Ipv4Addr::UNSPECIFIED),
503 0,
504 false,
505 )
506 .await
507}
508
509pub fn bind_to_with_config(
510 ip_addr: IpAddr,
511 port: u16,
512 config: SocketConfig,
513) -> io::Result<UdpSocket> {
514 let sock = udp_socket_with_config(config)?;
515
516 let addr = SocketAddr::new(ip_addr, port);
517
518 sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
519}
520
521pub fn bind_to_with_config_non_blocking(
522 ip_addr: IpAddr,
523 port: u16,
524 config: SocketConfig,
525) -> io::Result<UdpSocket> {
526 let sock = udp_socket_with_config(config)?;
527
528 let addr = SocketAddr::new(ip_addr, port);
529
530 sock.bind(&SockAddr::from(addr))?;
531 sock.set_nonblocking(true)?;
532 Ok(sock.into())
533}
534
535pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> {
537 let config = SocketConfig::default();
538 bind_common_with_config(ip_addr, port, config)
539}
540
541pub fn bind_common_with_config(
543 ip_addr: IpAddr,
544 port: u16,
545 config: SocketConfig,
546) -> io::Result<(UdpSocket, TcpListener)> {
547 let sock = udp_socket_with_config(config)?;
548
549 let addr = SocketAddr::new(ip_addr, port);
550 let sock_addr = SockAddr::from(addr);
551 sock.bind(&sock_addr)
552 .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
553}
554
555pub fn bind_two_in_range_with_offset(
556 ip_addr: IpAddr,
557 range: PortRange,
558 offset: u16,
559) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
560 let sock1_config = SocketConfig::default();
561 let sock2_config = SocketConfig::default();
562 bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config)
563}
564
565pub fn bind_two_in_range_with_offset_and_config(
566 ip_addr: IpAddr,
567 range: PortRange,
568 offset: u16,
569 sock1_config: SocketConfig,
570 sock2_config: SocketConfig,
571) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
572 if range.1.saturating_sub(range.0) < offset {
573 return Err(io::Error::other(
574 "range too small to find two ports with the correct offset".to_string(),
575 ));
576 }
577 for port in range.0..range.1 {
578 if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config) {
579 if range.1.saturating_sub(port) >= offset {
580 if let Ok(second_bind) =
581 bind_to_with_config(ip_addr, port.saturating_add(offset), sock2_config)
582 {
583 return Ok((
584 (first_bind.local_addr().unwrap().port(), first_bind),
585 (second_bind.local_addr().unwrap().port(), second_bind),
586 ));
587 }
588 } else {
589 break;
590 }
591 }
592 }
593 Err(io::Error::other(
594 "couldn't find two ports with the correct offset in range".to_string(),
595 ))
596}
597
598pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<u16> {
605 let [port] = find_available_ports_in_range(ip_addr, range)?;
606 Ok(port)
607}
608
609pub fn find_available_ports_in_range<const N: usize>(
614 ip_addr: IpAddr,
615 range: PortRange,
616) -> io::Result<[u16; N]> {
617 let mut result = [0u16; N];
618 let range = range.0..range.1;
619 let mut next_port_to_try = range
620 .clone()
621 .cycle() .skip(thread_rng().gen_range(range.clone()) as usize) .take(range.len()) .peekable();
625 let mut num = 0;
626 while num < N {
627 let port_to_try = next_port_to_try.next().unwrap(); match bind_common(ip_addr, port_to_try) {
629 Ok(_) => {
630 result[num] = port_to_try;
631 num = num.saturating_add(1);
632 }
633 Err(err) => {
634 if next_port_to_try.peek().is_none() {
635 return Err(err);
636 }
637 }
638 }
639 }
640 Ok(result)
641}
642
643pub fn bind_more_with_config(
644 socket: UdpSocket,
645 num: usize,
646 config: SocketConfig,
647) -> io::Result<Vec<UdpSocket>> {
648 let addr = socket.local_addr().unwrap();
649 let ip = addr.ip();
650 let port = addr.port();
651 std::iter::once(Ok(socket))
652 .chain((1..num).map(|_| bind_to_with_config(ip, port, config)))
653 .collect()
654}
655
656#[cfg(test)]
657mod tests {
658 use {
659 super::*,
660 ip_echo_server::IpEchoServerResponse,
661 itertools::Itertools,
662 std::{net::Ipv4Addr, time::Duration},
663 tokio::runtime::Runtime,
664 };
665
666 fn runtime() -> Runtime {
667 tokio::runtime::Builder::new_current_thread()
668 .enable_all()
669 .build()
670 .expect("Can not create a runtime")
671 }
672 #[test]
673 fn test_response_length() {
674 let resp = IpEchoServerResponse {
675 address: IpAddr::from([u16::MAX; 8]), shred_version: Some(u16::MAX),
677 };
678 let resp_size = bincode::serialized_size(&resp).unwrap();
679 assert_eq!(
680 IP_ECHO_SERVER_RESPONSE_LENGTH,
681 HEADER_LENGTH + resp_size as usize
682 );
683 }
684
685 #[test]
687 fn test_backward_compat() {
688 let address = IpAddr::from([
689 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
690 ]);
691 let response = IpEchoServerResponse {
692 address,
693 shred_version: Some(42),
694 };
695 let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
696 bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
697 data.truncate(HEADER_LENGTH + 20);
698 assert_eq!(
699 bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
700 address
701 );
702 }
703
704 #[test]
706 fn test_forward_compat() {
707 let address = IpAddr::from([
708 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
709 ]);
710 let mut data = [0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
711 bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
712 let response: Result<IpEchoServerResponse, _> =
713 bincode::deserialize(&data[HEADER_LENGTH..]);
714 assert_eq!(
715 response.unwrap(),
716 IpEchoServerResponse {
717 address,
718 shred_version: None,
719 }
720 );
721 }
722
723 #[test]
724 fn test_parse_port_or_addr() {
725 let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
726 assert_eq!(p1.port(), 9000);
727 let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), SocketAddr::from(([1, 2, 3, 4], 1)));
728 assert_eq!(p2.port(), 7000);
729 let p2 = parse_port_or_addr(Some("hi there"), SocketAddr::from(([1, 2, 3, 4], 1)));
730 assert_eq!(p2.port(), 1);
731 let p3 = parse_port_or_addr(None, SocketAddr::from(([1, 2, 3, 4], 1)));
732 assert_eq!(p3.port(), 1);
733 }
734
735 #[test]
736 fn test_parse_port_range() {
737 assert_eq!(parse_port_range("garbage"), None);
738 assert_eq!(parse_port_range("1-"), None);
739 assert_eq!(parse_port_range("1-2"), Some((1, 2)));
740 assert_eq!(parse_port_range("1-2-3"), None);
741 assert_eq!(parse_port_range("2-1"), None);
742 }
743
744 #[test]
745 fn test_parse_host() {
746 parse_host("localhost:1234").unwrap_err();
747 parse_host("localhost").unwrap();
748 parse_host("127.0.0.0:1234").unwrap_err();
749 parse_host("127.0.0.0").unwrap();
750 }
751
752 #[test]
753 fn test_parse_host_port() {
754 parse_host_port("localhost:1234").unwrap();
755 parse_host_port("localhost").unwrap_err();
756 parse_host_port("127.0.0.0:1234").unwrap();
757 parse_host_port("127.0.0.0").unwrap_err();
758 }
759
760 #[test]
761 fn test_is_host_port() {
762 assert!(is_host_port("localhost:1234".to_string()).is_ok());
763 assert!(is_host_port("localhost".to_string()).is_err());
764 }
765
766 #[test]
767 fn test_bind() {
768 let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
769 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
770 let s = bind_in_range(ip_addr, (pr_s, pr_e)).unwrap();
771 assert_eq!(s.0, pr_s, "bind_in_range should use first available port");
772 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
773 let config = SocketConfig::default().reuseport(true);
774 let x = bind_to_with_config(ip_addr, pr_s + 1, config).unwrap();
775 let y = bind_to_with_config(ip_addr, pr_s + 1, config).unwrap();
776 assert_eq!(
777 x.local_addr().unwrap().port(),
778 y.local_addr().unwrap().port()
779 );
780 bind_to(ip_addr, pr_s, false).unwrap_err();
781 bind_in_range(ip_addr, (pr_s, pr_s + 2)).unwrap_err();
782
783 let (port, v) =
784 multi_bind_in_range_with_config(ip_addr, (pr_s + 5, pr_e), config, 10).unwrap();
785 for sock in &v {
786 assert_eq!(port, sock.local_addr().unwrap().port());
787 }
788 }
789
790 #[test]
791 fn test_bind_with_any_port() {
792 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
793 let config = SocketConfig::default();
794 let x = bind_with_any_port_with_config(ip_addr, config).unwrap();
795 let y = bind_with_any_port_with_config(ip_addr, config).unwrap();
796 assert_ne!(
797 x.local_addr().unwrap().port(),
798 y.local_addr().unwrap().port()
799 );
800 }
801
802 #[test]
803 fn test_bind_in_range_nil() {
804 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
805 bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
806 bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
807 }
808
809 #[test]
810 fn test_find_available_port_in_range() {
811 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
812 let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
813 assert_eq!(
814 find_available_port_in_range(ip_addr, (pr_s, pr_s + 1)).unwrap(),
815 pr_s
816 );
817 let port = find_available_port_in_range(ip_addr, (pr_s, pr_e)).unwrap();
818 assert!((pr_s..pr_e).contains(&port));
819
820 let _socket = bind_to(ip_addr, port, false).unwrap();
821 find_available_port_in_range(ip_addr, (port, port + 1)).unwrap_err();
822 }
823
824 #[test]
825 fn test_find_available_ports_in_range() {
826 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
827 let port_range = sockets::localhost_port_range_for_tests();
828 assert!(port_range.1 - port_range.0 > 16);
829 let sock = bind_to_with_config(ip_addr, port_range.0 + 2, SocketConfig::default()).unwrap();
831 let ports: [u16; 15] = find_available_ports_in_range(ip_addr, port_range).unwrap();
832 let mut ports_vec = Vec::from(ports);
833 ports_vec.push(sock.local_addr().unwrap().port());
834 let res: Vec<_> = ports_vec.into_iter().unique().collect();
835 assert_eq!(res.len(), 16, "Should reserve 16 unique ports");
836 }
837
838 #[test]
839 fn test_bind_common_in_range() {
840 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
841 let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
842 let config = SocketConfig::default();
843 let (port, _sockets) =
844 bind_common_in_range_with_config(ip_addr, (pr_s, pr_e), config).unwrap();
845 assert!((pr_s..pr_e).contains(&port));
846
847 bind_common_in_range_with_config(ip_addr, (port, port + 1), config).unwrap_err();
848 }
849
850 #[test]
851 fn test_get_public_ip_addr_none() {
852 solana_logger::setup();
853 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
854 let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
855 let config = SocketConfig::default();
856 let (_server_port, (server_udp_socket, server_tcp_listener)) =
857 bind_common_in_range_with_config(ip_addr, (pr_s, pr_e), config).unwrap();
858
859 let _runtime = ip_echo_server(
860 server_tcp_listener,
861 DEFAULT_IP_ECHO_SERVER_THREADS,
862 Some(42),
863 );
864
865 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
866 assert_eq!(
867 get_public_ip_addr(&server_ip_echo_addr).unwrap(),
868 parse_host("127.0.0.1").unwrap(),
869 );
870 assert_eq!(get_cluster_shred_version(&server_ip_echo_addr).unwrap(), 42);
871 assert!(verify_all_reachable_tcp(&server_ip_echo_addr, vec![],));
872 assert!(verify_all_reachable_udp(&server_ip_echo_addr, &[],));
873 }
874
875 #[test]
876 fn test_get_public_ip_addr_reachable() {
877 solana_logger::setup();
878 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
879 let port_range = sockets::localhost_port_range_for_tests();
880 let config = SocketConfig::default();
881 let (_server_port, (server_udp_socket, server_tcp_listener)) =
882 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
883 let (_client_port, (client_udp_socket, client_tcp_listener)) =
884 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
885
886 let _runtime = ip_echo_server(
887 server_tcp_listener,
888 DEFAULT_IP_ECHO_SERVER_THREADS,
889 Some(65535),
890 );
891
892 let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
893 assert_eq!(
894 get_public_ip_addr(&ip_echo_server_addr).unwrap(),
895 parse_host("127.0.0.1").unwrap(),
896 );
897 assert_eq!(
898 get_cluster_shred_version(&ip_echo_server_addr).unwrap(),
899 65535
900 );
901 assert!(verify_all_reachable_tcp(
902 &ip_echo_server_addr,
903 vec![client_tcp_listener],
904 ));
905 assert!(verify_all_reachable_udp(
906 &ip_echo_server_addr,
907 &[&client_udp_socket],
908 ));
909 }
910
911 #[test]
912 fn test_verify_ports_tcp_unreachable() {
913 solana_logger::setup();
914 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
915 let port_range = sockets::localhost_port_range_for_tests();
916 let config = SocketConfig::default();
917 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
918 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
919
920 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
922
923 let (_, (_client_udp_socket, client_tcp_listener)) =
924 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
925
926 let rt = runtime();
927 assert!(!rt.block_on(ip_echo_client::verify_all_reachable_tcp(
928 server_ip_echo_addr,
929 vec![client_tcp_listener],
930 Duration::from_secs(2),
931 )));
932 }
933
934 #[test]
935 fn test_verify_ports_udp_unreachable() {
936 solana_logger::setup();
937 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
938 let port_range = sockets::localhost_port_range_for_tests();
939 let config = SocketConfig::default();
940 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
941 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
942
943 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
945
946 let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
947 bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
948
949 let rt = runtime();
950 assert!(!rt.block_on(ip_echo_client::verify_all_reachable_udp(
951 server_ip_echo_addr,
952 &[&client_udp_socket],
953 Duration::from_secs(2),
954 3,
955 )));
956 }
957
958 #[test]
959 fn test_verify_many_ports_reachable() {
960 solana_logger::setup();
961 let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
962 let config = SocketConfig::default();
963 let mut tcp_listeners = vec![];
964 let mut udp_sockets = vec![];
965
966 let (_server_port, (_, server_tcp_listener)) =
967 bind_common_in_range_with_config(ip_addr, (2200, 2300), config).unwrap();
968 for _ in 0..MAX_PORT_VERIFY_THREADS * 2 {
969 let (_client_port, (client_udp_socket, client_tcp_listener)) =
970 bind_common_in_range_with_config(
971 ip_addr,
972 (2300, 2300 + (MAX_PORT_VERIFY_THREADS * 3) as u16),
973 config,
974 )
975 .unwrap();
976 tcp_listeners.push(client_tcp_listener);
977 udp_sockets.push(client_udp_socket);
978 }
979
980 let ip_echo_server_addr = server_tcp_listener.local_addr().unwrap();
981
982 let _runtime = ip_echo_server(
983 server_tcp_listener,
984 DEFAULT_IP_ECHO_SERVER_THREADS,
985 Some(65535),
986 );
987
988 assert_eq!(
989 get_public_ip_addr(&ip_echo_server_addr).unwrap(),
990 parse_host("127.0.0.1").unwrap(),
991 );
992
993 let socket_refs = udp_sockets.iter().collect_vec();
994 assert!(verify_all_reachable_tcp(
995 &ip_echo_server_addr,
996 tcp_listeners,
997 ));
998 assert!(verify_all_reachable_udp(&ip_echo_server_addr, &socket_refs));
999 }
1000
1001 #[test]
1002 fn test_bind_two_in_range_with_offset() {
1003 solana_logger::setup();
1004 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
1005 let offset = 6;
1006 if let Ok(((port1, _), (port2, _))) =
1007 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
1008 {
1009 assert!(port2 == port1 + offset);
1010 }
1011 let offset = 42;
1012 if let Ok(((port1, _), (port2, _))) =
1013 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
1014 {
1015 assert!(port2 == port1 + offset);
1016 }
1017 assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err());
1018 }
1019
1020 #[test]
1021 fn test_multi_bind_in_range_with_config_reuseport_disabled() {
1022 let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
1023 let config = SocketConfig::default(); let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2);
1026
1027 assert!(
1028 result.is_err(),
1029 "Expected an error when reuseport is not set to true"
1030 );
1031 }
1032}