1#![allow(clippy::arithmetic_side_effects)]
3use {
4 crossbeam_channel::unbounded,
5 log::*,
6 rand::{thread_rng, Rng},
7 socket2::{Domain, SockAddr, Socket, Type},
8 std::{
9 collections::{BTreeMap, HashSet},
10 io::{self, Read, Write},
11 net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
12 sync::{Arc, RwLock},
13 time::{Duration, Instant},
14 },
15 url::Url,
16};
17
18mod ip_echo_server;
19pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
20use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};
21
22pub struct UdpSocketPair {
24 pub addr: SocketAddr, pub receiver: UdpSocket, pub sender: UdpSocket, }
28
29pub type PortRange = (u16, u16);
30
31pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
32pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; pub(crate) const HEADER_LENGTH: usize = 4;
35pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
36
37fn ip_echo_server_request(
38 ip_echo_server_addr: &SocketAddr,
39 msg: IpEchoServerMessage,
40) -> Result<IpEchoServerResponse, String> {
41 let timeout = Duration::new(5, 0);
42 TcpStream::connect_timeout(ip_echo_server_addr, timeout)
43 .and_then(|mut stream| {
44 let mut bytes = vec![0; HEADER_LENGTH];
46
47 bytes.append(&mut bincode::serialize(&msg).expect("serialize IpEchoServerMessage"));
48
49 bytes.push(b'\n');
52
53 stream.set_read_timeout(Some(Duration::new(10, 0)))?;
54 stream.write_all(&bytes)?;
55 stream.shutdown(std::net::Shutdown::Write)?;
56 let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
57 let _ = stream.read(&mut data[..])?;
58 Ok(data)
59 })
60 .and_then(|data| {
61 if data.len() < HEADER_LENGTH {
65 return Err(io::Error::new(
66 io::ErrorKind::Other,
67 format!("Response too short, received {} bytes", data.len()),
68 ));
69 }
70
71 let response_header: String =
72 data[0..HEADER_LENGTH].iter().map(|b| *b as char).collect();
73 if response_header != "\0\0\0\0" {
74 if response_header == "HTTP" {
75 let http_response = data.iter().map(|b| *b as char).collect::<String>();
76 return Err(io::Error::new(
77 io::ErrorKind::Other,
78 format!(
79 "Invalid gossip entrypoint. {ip_echo_server_addr} looks to be an HTTP port: {http_response}"
80 ),
81 ));
82 }
83 return Err(io::Error::new(
84 io::ErrorKind::Other,
85 format!(
86 "Invalid gossip entrypoint. {ip_echo_server_addr} provided an invalid response header: '{response_header}'"
87 ),
88 ));
89 }
90
91 bincode::deserialize(&data[HEADER_LENGTH..]).map_err(|err| {
92 io::Error::new(
93 io::ErrorKind::Other,
94 format!("Failed to deserialize: {err:?}"),
95 )
96 })
97 })
98 .map_err(|err| err.to_string())
99}
100
101pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
104 let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
105 Ok(resp.address)
106}
107
108pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
109 let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
110 resp.shred_version
111 .ok_or_else(|| String::from("IP echo server does not return a shred-version"))
112}
113
114const DEFAULT_TIMEOUT_SECS: u64 = 5;
117const DEFAULT_RETRY_COUNT: usize = 5;
118
119fn do_verify_reachable_ports(
120 ip_echo_server_addr: &SocketAddr,
121 tcp_listeners: Vec<(u16, TcpListener)>,
122 udp_sockets: &[&UdpSocket],
123 timeout: u64,
124 udp_retry_count: usize,
125) -> bool {
126 info!(
127 "Checking that tcp ports {:?} are reachable from {:?}",
128 tcp_listeners, ip_echo_server_addr
129 );
130
131 let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
132 let _ = ip_echo_server_request(
133 ip_echo_server_addr,
134 IpEchoServerMessage::new(&tcp_ports, &[]),
135 )
136 .map_err(|err| warn!("ip_echo_server request failed: {}", err));
137
138 let mut ok = true;
139 let timeout = Duration::from_secs(timeout);
140
141 for (port, tcp_listener) in tcp_listeners {
143 let (sender, receiver) = unbounded();
144 let listening_addr = tcp_listener.local_addr().unwrap();
145 let thread_handle = std::thread::Builder::new()
146 .name(format!("mlnVrfyTcp{port:05}"))
147 .spawn(move || {
148 debug!("Waiting for incoming connection on tcp/{}", port);
149 match tcp_listener.incoming().next() {
150 Some(_) => sender
151 .send(())
152 .unwrap_or_else(|err| warn!("send failure: {}", err)),
153 None => warn!("tcp incoming failed"),
154 }
155 })
156 .unwrap();
157 match receiver.recv_timeout(timeout) {
158 Ok(_) => {
159 info!("tcp/{} is reachable", port);
160 }
161 Err(err) => {
162 error!(
163 "Received no response at tcp/{}, check your port configuration: {}",
164 port, err
165 );
166 TcpStream::connect_timeout(&listening_addr, timeout).unwrap();
171 ok = false;
172 }
173 }
174 thread_handle.join().unwrap();
176 }
177
178 if !ok {
179 return ok;
181 }
182
183 let mut udp_ports: BTreeMap<_, _> = BTreeMap::new();
184 udp_sockets.iter().for_each(|udp_socket| {
185 let port = udp_socket.local_addr().unwrap().port();
186 udp_ports
187 .entry(port)
188 .or_insert_with(Vec::new)
189 .push(udp_socket);
190 });
191 let udp_ports: Vec<_> = udp_ports.into_iter().collect();
192
193 info!(
194 "Checking that udp ports {:?} are reachable from {:?}",
195 udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
196 ip_echo_server_addr
197 );
198
199 'outer: for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) {
200 ok = false;
201
202 for udp_remaining_retry in (0_usize..udp_retry_count).rev() {
203 let (checked_ports, checked_socket_iter) = (
204 checked_ports_and_sockets
205 .iter()
206 .map(|(port, _)| *port)
207 .collect::<Vec<_>>(),
208 checked_ports_and_sockets
209 .iter()
210 .flat_map(|(_, sockets)| sockets),
211 );
212
213 let _ = ip_echo_server_request(
214 ip_echo_server_addr,
215 IpEchoServerMessage::new(&[], &checked_ports),
216 )
217 .map_err(|err| warn!("ip_echo_server request failed: {}", err));
218
219 let reachable_ports = Arc::new(RwLock::new(HashSet::new()));
221 let thread_handles: Vec<_> = checked_socket_iter
222 .map(|udp_socket| {
223 let port = udp_socket.local_addr().unwrap().port();
224 let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
225 let reachable_ports = reachable_ports.clone();
226
227 std::thread::Builder::new()
228 .name(format!("mlnVrfyUdp{port:05}"))
229 .spawn(move || {
230 let start = Instant::now();
231
232 let original_read_timeout = udp_socket.read_timeout().unwrap();
233 udp_socket
234 .set_read_timeout(Some(Duration::from_millis(250)))
235 .unwrap();
236 loop {
237 if reachable_ports.read().unwrap().contains(&port)
238 || Instant::now().duration_since(start) >= timeout
239 {
240 break;
241 }
242
243 let recv_result = udp_socket.recv(&mut [0; 1]);
244 debug!(
245 "Waited for incoming datagram on udp/{}: {:?}",
246 port, recv_result
247 );
248
249 if recv_result.is_ok() {
250 reachable_ports.write().unwrap().insert(port);
251 break;
252 }
253 }
254 udp_socket.set_read_timeout(original_read_timeout).unwrap();
255 })
256 .unwrap()
257 })
258 .collect();
259
260 for thread in thread_handles {
265 thread.join().unwrap();
266 }
267
268 let reachable_ports = reachable_ports.read().unwrap().clone();
269 if reachable_ports.len() == checked_ports.len() {
270 info!(
271 "checked udp ports: {:?}, reachable udp ports: {:?}",
272 checked_ports, reachable_ports
273 );
274 ok = true;
275 break;
276 } else if udp_remaining_retry > 0 {
277 error!(
279 "checked udp ports: {:?}, reachable udp ports: {:?}",
280 checked_ports, reachable_ports
281 );
282 error!("There are some udp ports with no response!! Retrying...");
283 } else {
284 error!("Maximum retry count is reached....");
285 break 'outer;
286 }
287 }
288 }
289
290 ok
291}
292
293pub fn verify_reachable_ports(
294 ip_echo_server_addr: &SocketAddr,
295 tcp_listeners: Vec<(u16, TcpListener)>,
296 udp_sockets: &[&UdpSocket],
297) -> bool {
298 do_verify_reachable_ports(
299 ip_echo_server_addr,
300 tcp_listeners,
301 udp_sockets,
302 DEFAULT_TIMEOUT_SECS,
303 DEFAULT_RETRY_COUNT,
304 )
305}
306
307pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
308 if let Some(addrstr) = optstr {
309 if let Ok(port) = addrstr.parse() {
310 let mut addr = default_addr;
311 addr.set_port(port);
312 addr
313 } else if let Ok(addr) = addrstr.parse() {
314 addr
315 } else {
316 default_addr
317 }
318 } else {
319 default_addr
320 }
321}
322
323pub fn parse_port_range(port_range: &str) -> Option<PortRange> {
324 let ports: Vec<&str> = port_range.split('-').collect();
325 if ports.len() != 2 {
326 return None;
327 }
328
329 let start_port = ports[0].parse();
330 let end_port = ports[1].parse();
331
332 if start_port.is_err() || end_port.is_err() {
333 return None;
334 }
335 let start_port = start_port.unwrap();
336 let end_port = end_port.unwrap();
337 if end_port < start_port {
338 return None;
339 }
340 Some((start_port, end_port))
341}
342
343pub fn parse_host(host: &str) -> Result<IpAddr, String> {
344 let parsed_url = Url::parse(&format!("http://{host}")).map_err(|e| e.to_string())?;
347 if parsed_url.port().is_some() {
348 return Err(format!("Expected port in URL: {host}"));
349 }
350
351 let ips: Vec<_> = (host, 0)
353 .to_socket_addrs()
354 .map_err(|err| err.to_string())?
355 .map(|socket_address| socket_address.ip())
356 .collect();
357 if ips.is_empty() {
358 Err(format!("Unable to resolve host: {host}"))
359 } else {
360 Ok(ips[0])
361 }
362}
363
364pub fn is_host(string: String) -> Result<(), String> {
365 parse_host(&string).map(|_| ())
366}
367
368pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
369 let addrs: Vec<_> = host_port
370 .to_socket_addrs()
371 .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
372 .collect();
373 if addrs.is_empty() {
374 Err(format!("Unable to resolve host: {host_port}"))
375 } else {
376 Ok(addrs[0])
377 }
378}
379
380pub fn is_host_port(string: String) -> Result<(), String> {
381 parse_host_port(&string).map(|_| ())
382}
383
384#[cfg(any(windows, target_os = "ios"))]
385fn udp_socket(_reuseaddr: bool) -> io::Result<Socket> {
386 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
387 Ok(sock)
388}
389
390#[cfg(not(any(windows, target_os = "ios")))]
391fn udp_socket(reuseaddr: bool) -> io::Result<Socket> {
392 use {
393 nix::sys::socket::{
394 setsockopt,
395 sockopt::{ReuseAddr, ReusePort},
396 },
397 std::os::unix::io::AsRawFd,
398 };
399
400 let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
401 let sock_fd = sock.as_raw_fd();
402
403 if reuseaddr {
404 setsockopt(sock_fd, ReusePort, &true).ok();
406 setsockopt(sock_fd, ReuseAddr, &true).ok();
407 }
408
409 Ok(sock)
410}
411
412pub fn bind_common_in_range(
414 ip_addr: IpAddr,
415 range: PortRange,
416) -> io::Result<(u16, (UdpSocket, TcpListener))> {
417 for port in range.0..range.1 {
418 if let Ok((sock, listener)) = bind_common(ip_addr, port, false) {
419 return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
420 }
421 }
422
423 Err(io::Error::new(
424 io::ErrorKind::Other,
425 format!("No available TCP/UDP ports in {range:?}"),
426 ))
427}
428
429pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
430 let sock = udp_socket(false)?;
431
432 for port in range.0..range.1 {
433 let addr = SocketAddr::new(ip_addr, port);
434
435 if sock.bind(&SockAddr::from(addr)).is_ok() {
436 let sock: UdpSocket = sock.into();
437 return Result::Ok((sock.local_addr().unwrap().port(), sock));
438 }
439 }
440
441 Err(io::Error::new(
442 io::ErrorKind::Other,
443 format!("No available UDP ports in {range:?}"),
444 ))
445}
446
447pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result<UdpSocket> {
448 let sock = udp_socket(false)?;
449 let addr = SocketAddr::new(ip_addr, 0);
450 match sock.bind(&SockAddr::from(addr)) {
451 Ok(_) => Result::Ok(sock.into()),
452 Err(err) => Err(io::Error::new(
453 io::ErrorKind::Other,
454 format!("No available UDP port: {err}"),
455 )),
456 }
457}
458
459pub fn multi_bind_in_range(
461 ip_addr: IpAddr,
462 range: PortRange,
463 mut num: usize,
464) -> io::Result<(u16, Vec<UdpSocket>)> {
465 if cfg!(windows) && num != 1 {
466 warn!(
468 "multi_bind_in_range() only supports 1 socket in windows ({} requested)",
469 num
470 );
471 num = 1;
472 }
473 let mut sockets = Vec::with_capacity(num);
474
475 const NUM_TRIES: usize = 100;
476 let mut port = 0;
477 let mut error = None;
478 for _ in 0..NUM_TRIES {
479 port = {
480 let (port, _) = bind_in_range(ip_addr, range)?;
481 port
482 }; for _ in 0..num {
485 let sock = bind_to(ip_addr, port, true);
486 if let Ok(sock) = sock {
487 sockets.push(sock);
488 } else {
489 error = Some(sock);
490 break;
491 }
492 }
493 if sockets.len() == num {
494 break;
495 } else {
496 sockets.clear();
497 }
498 }
499 if sockets.len() != num {
500 error.unwrap()?;
501 }
502 Ok((port, sockets))
503}
504
505pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result<UdpSocket> {
506 let sock = udp_socket(reuseaddr)?;
507
508 let addr = SocketAddr::new(ip_addr, port);
509
510 sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
511}
512
513pub fn bind_common(
515 ip_addr: IpAddr,
516 port: u16,
517 reuseaddr: bool,
518) -> io::Result<(UdpSocket, TcpListener)> {
519 let sock = udp_socket(reuseaddr)?;
520
521 let addr = SocketAddr::new(ip_addr, port);
522 let sock_addr = SockAddr::from(addr);
523 sock.bind(&sock_addr)
524 .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
525}
526
527pub fn bind_two_in_range_with_offset(
528 ip_addr: IpAddr,
529 range: PortRange,
530 offset: u16,
531) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
532 if range.1.saturating_sub(range.0) < offset {
533 return Err(io::Error::new(
534 io::ErrorKind::Other,
535 "range too small to find two ports with the correct offset".to_string(),
536 ));
537 }
538 for port in range.0..range.1 {
539 if let Ok(first_bind) = bind_to(ip_addr, port, false) {
540 if range.1.saturating_sub(port) >= offset {
541 if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) {
542 return Ok((
543 (first_bind.local_addr().unwrap().port(), first_bind),
544 (second_bind.local_addr().unwrap().port(), second_bind),
545 ));
546 }
547 } else {
548 break;
549 }
550 }
551 }
552 Err(io::Error::new(
553 io::ErrorKind::Other,
554 "couldn't find two ports with the correct offset in range".to_string(),
555 ))
556}
557
558pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<u16> {
559 let (start, end) = range;
560 let mut tries_left = end - start;
561 let mut rand_port = thread_rng().gen_range(start..end);
562 loop {
563 match bind_common(ip_addr, rand_port, false) {
564 Ok(_) => {
565 break Ok(rand_port);
566 }
567 Err(err) => {
568 if tries_left == 0 {
569 return Err(err);
570 }
571 }
572 }
573 rand_port += 1;
574 if rand_port == end {
575 rand_port = start;
576 }
577 tries_left -= 1;
578 }
579}
580
581#[cfg(test)]
582mod tests {
583 use {super::*, std::net::Ipv4Addr};
584
585 #[test]
586 fn test_response_length() {
587 let resp = IpEchoServerResponse {
588 address: IpAddr::from([u16::MAX; 8]), shred_version: Some(u16::MAX),
590 };
591 let resp_size = bincode::serialized_size(&resp).unwrap();
592 assert_eq!(
593 IP_ECHO_SERVER_RESPONSE_LENGTH,
594 HEADER_LENGTH + resp_size as usize
595 );
596 }
597
598 #[test]
600 fn test_backward_compat() {
601 let address = IpAddr::from([
602 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
603 ]);
604 let response = IpEchoServerResponse {
605 address,
606 shred_version: Some(42),
607 };
608 let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
609 bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
610 data.truncate(HEADER_LENGTH + 20);
611 assert_eq!(
612 bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
613 address
614 );
615 }
616
617 #[test]
619 fn test_forward_compat() {
620 let address = IpAddr::from([
621 525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
622 ]);
623 let mut data = [0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
624 bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
625 let response: Result<IpEchoServerResponse, _> =
626 bincode::deserialize(&data[HEADER_LENGTH..]);
627 assert_eq!(
628 response.unwrap(),
629 IpEchoServerResponse {
630 address,
631 shred_version: None,
632 }
633 );
634 }
635
636 #[test]
637 fn test_parse_port_or_addr() {
638 let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
639 assert_eq!(p1.port(), 9000);
640 let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), SocketAddr::from(([1, 2, 3, 4], 1)));
641 assert_eq!(p2.port(), 7000);
642 let p2 = parse_port_or_addr(Some("hi there"), SocketAddr::from(([1, 2, 3, 4], 1)));
643 assert_eq!(p2.port(), 1);
644 let p3 = parse_port_or_addr(None, SocketAddr::from(([1, 2, 3, 4], 1)));
645 assert_eq!(p3.port(), 1);
646 }
647
648 #[test]
649 fn test_parse_port_range() {
650 assert_eq!(parse_port_range("garbage"), None);
651 assert_eq!(parse_port_range("1-"), None);
652 assert_eq!(parse_port_range("1-2"), Some((1, 2)));
653 assert_eq!(parse_port_range("1-2-3"), None);
654 assert_eq!(parse_port_range("2-1"), None);
655 }
656
657 #[test]
658 fn test_parse_host() {
659 parse_host("localhost:1234").unwrap_err();
660 parse_host("localhost").unwrap();
661 parse_host("127.0.0.0:1234").unwrap_err();
662 parse_host("127.0.0.0").unwrap();
663 }
664
665 #[test]
666 fn test_parse_host_port() {
667 parse_host_port("localhost:1234").unwrap();
668 parse_host_port("localhost").unwrap_err();
669 parse_host_port("127.0.0.0:1234").unwrap();
670 parse_host_port("127.0.0.0").unwrap_err();
671 }
672
673 #[test]
674 fn test_is_host_port() {
675 assert!(is_host_port("localhost:1234".to_string()).is_ok());
676 assert!(is_host_port("localhost".to_string()).is_err());
677 }
678
679 #[test]
680 fn test_bind() {
681 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
682 assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000);
683 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
684 let x = bind_to(ip_addr, 2002, true).unwrap();
685 let y = bind_to(ip_addr, 2002, true).unwrap();
686 assert_eq!(
687 x.local_addr().unwrap().port(),
688 y.local_addr().unwrap().port()
689 );
690 bind_to(ip_addr, 2002, false).unwrap_err();
691 bind_in_range(ip_addr, (2002, 2003)).unwrap_err();
692
693 let (port, v) = multi_bind_in_range(ip_addr, (2010, 2110), 10).unwrap();
694 for sock in &v {
695 assert_eq!(port, sock.local_addr().unwrap().port());
696 }
697 }
698
699 #[test]
700 fn test_bind_with_any_port() {
701 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
702 let x = bind_with_any_port(ip_addr).unwrap();
703 let y = bind_with_any_port(ip_addr).unwrap();
704 assert_ne!(
705 x.local_addr().unwrap().port(),
706 y.local_addr().unwrap().port()
707 );
708 }
709
710 #[test]
711 fn test_bind_in_range_nil() {
712 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
713 bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
714 bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
715 }
716
717 #[test]
718 fn test_find_available_port_in_range() {
719 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
720 assert_eq!(
721 find_available_port_in_range(ip_addr, (3000, 3001)).unwrap(),
722 3000
723 );
724 let port = find_available_port_in_range(ip_addr, (3000, 3050)).unwrap();
725 assert!((3000..3050).contains(&port));
726
727 let _socket = bind_to(ip_addr, port, false).unwrap();
728 find_available_port_in_range(ip_addr, (port, port + 1)).unwrap_err();
729 }
730
731 #[test]
732 fn test_bind_common_in_range() {
733 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
734 let (port, _sockets) = bind_common_in_range(ip_addr, (3100, 3150)).unwrap();
735 assert!((3100..3150).contains(&port));
736
737 bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err();
738 }
739
740 #[test]
741 fn test_get_public_ip_addr_none() {
742 miraland_logger::setup();
743 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
744 let (_server_port, (server_udp_socket, server_tcp_listener)) =
745 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
746
747 let _runtime = ip_echo_server(server_tcp_listener, Some(42));
748
749 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
750 assert_eq!(
751 get_public_ip_addr(&server_ip_echo_addr),
752 parse_host("127.0.0.1"),
753 );
754 assert_eq!(get_cluster_shred_version(&server_ip_echo_addr), Ok(42));
755 assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
756 }
757
758 #[test]
759 fn test_get_public_ip_addr_reachable() {
760 miraland_logger::setup();
761 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
762 let (_server_port, (server_udp_socket, server_tcp_listener)) =
763 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
764 let (client_port, (client_udp_socket, client_tcp_listener)) =
765 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
766
767 let _runtime = ip_echo_server(server_tcp_listener, Some(65535));
768
769 let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
770 assert_eq!(
771 get_public_ip_addr(&ip_echo_server_addr),
772 parse_host("127.0.0.1"),
773 );
774 assert_eq!(get_cluster_shred_version(&ip_echo_server_addr), Ok(65535));
775 assert!(verify_reachable_ports(
776 &ip_echo_server_addr,
777 vec![(client_port, client_tcp_listener)],
778 &[&client_udp_socket],
779 ));
780 }
781
782 #[test]
783 fn test_get_public_ip_addr_tcp_unreachable() {
784 miraland_logger::setup();
785 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
786 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
787 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
788
789 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
792
793 let (correct_client_port, (_client_udp_socket, client_tcp_listener)) =
794 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
795
796 assert!(!do_verify_reachable_ports(
797 &server_ip_echo_addr,
798 vec![(correct_client_port, client_tcp_listener)],
799 &[],
800 2,
801 3,
802 ));
803 }
804
805 #[test]
806 fn test_get_public_ip_addr_udp_unreachable() {
807 miraland_logger::setup();
808 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
809 let (_server_port, (server_udp_socket, _server_tcp_listener)) =
810 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
811
812 let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
815
816 let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
817 bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
818
819 assert!(!do_verify_reachable_ports(
820 &server_ip_echo_addr,
821 vec![],
822 &[&client_udp_socket],
823 2,
824 3,
825 ));
826 }
827
828 #[test]
829 fn test_bind_two_in_range_with_offset() {
830 miraland_logger::setup();
831 let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
832 let offset = 6;
833 if let Ok(((port1, _), (port2, _))) =
834 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
835 {
836 assert!(port2 == port1 + offset);
837 }
838 let offset = 42;
839 if let Ok(((port1, _), (port2, _))) =
840 bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
841 {
842 assert!(port2 == port1 + offset);
843 }
844 assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err());
845 }
846}