miraland_net_utils/
lib.rs

1//! The `net_utils` module assists with networking
2#![allow(clippy::arithmetic_side_effects)]
3use {
4    crossbeam_channel::unbounded,
5    log::*,
6    rand::{thread_rng, Rng},
7    socket2::{Domain, SockAddr, Socket, Type},
8    std::{
9        collections::{BTreeMap, HashSet},
10        io::{self, Read, Write},
11        net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket},
12        sync::{Arc, RwLock},
13        time::{Duration, Instant},
14    },
15    url::Url,
16};
17
18mod ip_echo_server;
19pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};
20use ip_echo_server::{IpEchoServerMessage, IpEchoServerResponse};
21
22/// A data type representing a public Udp socket
23pub struct UdpSocketPair {
24    pub addr: SocketAddr,    // Public address of the socket
25    pub receiver: UdpSocket, // Locally bound socket that can receive from the public address
26    pub sender: UdpSocket,   // Locally bound socket to send via public address
27}
28
29pub type PortRange = (u16, u16);
30
31pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
32pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 14; // VALIDATOR_PORT_RANGE must be at least this wide
33
34pub(crate) const HEADER_LENGTH: usize = 4;
35pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
36
37fn ip_echo_server_request(
38    ip_echo_server_addr: &SocketAddr,
39    msg: IpEchoServerMessage,
40) -> Result<IpEchoServerResponse, String> {
41    let timeout = Duration::new(5, 0);
42    TcpStream::connect_timeout(ip_echo_server_addr, timeout)
43        .and_then(|mut stream| {
44            // Start with HEADER_LENGTH null bytes to avoid looking like an HTTP GET/POST request
45            let mut bytes = vec![0; HEADER_LENGTH];
46
47            bytes.append(&mut bincode::serialize(&msg).expect("serialize IpEchoServerMessage"));
48
49            // End with '\n' to make this request look HTTP-ish and tickle an error response back
50            // from an HTTP server
51            bytes.push(b'\n');
52
53            stream.set_read_timeout(Some(Duration::new(10, 0)))?;
54            stream.write_all(&bytes)?;
55            stream.shutdown(std::net::Shutdown::Write)?;
56            let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
57            let _ = stream.read(&mut data[..])?;
58            Ok(data)
59        })
60        .and_then(|data| {
61            // It's common for users to accidentally confuse the validator's gossip port and JSON
62            // RPC port.  Attempt to detect when this occurs by looking for the standard HTTP
63            // response header and provide the user with a helpful error message
64            if data.len() < HEADER_LENGTH {
65                return Err(io::Error::new(
66                    io::ErrorKind::Other,
67                    format!("Response too short, received {} bytes", data.len()),
68                ));
69            }
70
71            let response_header: String =
72                data[0..HEADER_LENGTH].iter().map(|b| *b as char).collect();
73            if response_header != "\0\0\0\0" {
74                if response_header == "HTTP" {
75                    let http_response = data.iter().map(|b| *b as char).collect::<String>();
76                    return Err(io::Error::new(
77                        io::ErrorKind::Other,
78                        format!(
79                            "Invalid gossip entrypoint. {ip_echo_server_addr} looks to be an HTTP port: {http_response}"
80                        ),
81                    ));
82                }
83                return Err(io::Error::new(
84                    io::ErrorKind::Other,
85                    format!(
86                        "Invalid gossip entrypoint. {ip_echo_server_addr} provided an invalid response header: '{response_header}'"
87                    ),
88                ));
89            }
90
91            bincode::deserialize(&data[HEADER_LENGTH..]).map_err(|err| {
92                io::Error::new(
93                    io::ErrorKind::Other,
94                    format!("Failed to deserialize: {err:?}"),
95                )
96            })
97        })
98        .map_err(|err| err.to_string())
99}
100
101/// Determine the public IP address of this machine by asking an ip_echo_server at the given
102/// address
103pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
104    let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
105    Ok(resp.address)
106}
107
108pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
109    let resp = ip_echo_server_request(ip_echo_server_addr, IpEchoServerMessage::default())?;
110    resp.shred_version
111        .ok_or_else(|| String::from("IP echo server does not return a shred-version"))
112}
113
114// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
115// `ip_echo_server_addr`
116const DEFAULT_TIMEOUT_SECS: u64 = 5;
117const DEFAULT_RETRY_COUNT: usize = 5;
118
119fn do_verify_reachable_ports(
120    ip_echo_server_addr: &SocketAddr,
121    tcp_listeners: Vec<(u16, TcpListener)>,
122    udp_sockets: &[&UdpSocket],
123    timeout: u64,
124    udp_retry_count: usize,
125) -> bool {
126    info!(
127        "Checking that tcp ports {:?} are reachable from {:?}",
128        tcp_listeners, ip_echo_server_addr
129    );
130
131    let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
132    let _ = ip_echo_server_request(
133        ip_echo_server_addr,
134        IpEchoServerMessage::new(&tcp_ports, &[]),
135    )
136    .map_err(|err| warn!("ip_echo_server request failed: {}", err));
137
138    let mut ok = true;
139    let timeout = Duration::from_secs(timeout);
140
141    // Wait for a connection to open on each TCP port
142    for (port, tcp_listener) in tcp_listeners {
143        let (sender, receiver) = unbounded();
144        let listening_addr = tcp_listener.local_addr().unwrap();
145        let thread_handle = std::thread::Builder::new()
146            .name(format!("mlnVrfyTcp{port:05}"))
147            .spawn(move || {
148                debug!("Waiting for incoming connection on tcp/{}", port);
149                match tcp_listener.incoming().next() {
150                    Some(_) => sender
151                        .send(())
152                        .unwrap_or_else(|err| warn!("send failure: {}", err)),
153                    None => warn!("tcp incoming failed"),
154                }
155            })
156            .unwrap();
157        match receiver.recv_timeout(timeout) {
158            Ok(_) => {
159                info!("tcp/{} is reachable", port);
160            }
161            Err(err) => {
162                error!(
163                    "Received no response at tcp/{}, check your port configuration: {}",
164                    port, err
165                );
166                // Ugh, std rustc doesn't provide accepting with timeout or restoring original
167                // nonblocking-status of sockets because of lack of getter, only the setter...
168                // So, to close the thread cleanly, just connect from here.
169                // ref: https://github.com/rust-lang/rust/issues/31615
170                TcpStream::connect_timeout(&listening_addr, timeout).unwrap();
171                ok = false;
172            }
173        }
174        // ensure to reap the thread
175        thread_handle.join().unwrap();
176    }
177
178    if !ok {
179        // No retries for TCP, abort on the first failure
180        return ok;
181    }
182
183    let mut udp_ports: BTreeMap<_, _> = BTreeMap::new();
184    udp_sockets.iter().for_each(|udp_socket| {
185        let port = udp_socket.local_addr().unwrap().port();
186        udp_ports
187            .entry(port)
188            .or_insert_with(Vec::new)
189            .push(udp_socket);
190    });
191    let udp_ports: Vec<_> = udp_ports.into_iter().collect();
192
193    info!(
194        "Checking that udp ports {:?} are reachable from {:?}",
195        udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
196        ip_echo_server_addr
197    );
198
199    'outer: for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) {
200        ok = false;
201
202        for udp_remaining_retry in (0_usize..udp_retry_count).rev() {
203            let (checked_ports, checked_socket_iter) = (
204                checked_ports_and_sockets
205                    .iter()
206                    .map(|(port, _)| *port)
207                    .collect::<Vec<_>>(),
208                checked_ports_and_sockets
209                    .iter()
210                    .flat_map(|(_, sockets)| sockets),
211            );
212
213            let _ = ip_echo_server_request(
214                ip_echo_server_addr,
215                IpEchoServerMessage::new(&[], &checked_ports),
216            )
217            .map_err(|err| warn!("ip_echo_server request failed: {}", err));
218
219            // Spawn threads at once!
220            let reachable_ports = Arc::new(RwLock::new(HashSet::new()));
221            let thread_handles: Vec<_> = checked_socket_iter
222                .map(|udp_socket| {
223                    let port = udp_socket.local_addr().unwrap().port();
224                    let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
225                    let reachable_ports = reachable_ports.clone();
226
227                    std::thread::Builder::new()
228                        .name(format!("mlnVrfyUdp{port:05}"))
229                        .spawn(move || {
230                            let start = Instant::now();
231
232                            let original_read_timeout = udp_socket.read_timeout().unwrap();
233                            udp_socket
234                                .set_read_timeout(Some(Duration::from_millis(250)))
235                                .unwrap();
236                            loop {
237                                if reachable_ports.read().unwrap().contains(&port)
238                                    || Instant::now().duration_since(start) >= timeout
239                                {
240                                    break;
241                                }
242
243                                let recv_result = udp_socket.recv(&mut [0; 1]);
244                                debug!(
245                                    "Waited for incoming datagram on udp/{}: {:?}",
246                                    port, recv_result
247                                );
248
249                                if recv_result.is_ok() {
250                                    reachable_ports.write().unwrap().insert(port);
251                                    break;
252                                }
253                            }
254                            udp_socket.set_read_timeout(original_read_timeout).unwrap();
255                        })
256                        .unwrap()
257                })
258                .collect();
259
260            // Now join threads!
261            // Separate from the above by collect()-ing as an intermediately step to make the iterator
262            // eager not lazy so that joining happens here at once after creating bunch of threads
263            // at once.
264            for thread in thread_handles {
265                thread.join().unwrap();
266            }
267
268            let reachable_ports = reachable_ports.read().unwrap().clone();
269            if reachable_ports.len() == checked_ports.len() {
270                info!(
271                    "checked udp ports: {:?}, reachable udp ports: {:?}",
272                    checked_ports, reachable_ports
273                );
274                ok = true;
275                break;
276            } else if udp_remaining_retry > 0 {
277                // Might have lost a UDP packet, retry a couple times
278                error!(
279                    "checked udp ports: {:?}, reachable udp ports: {:?}",
280                    checked_ports, reachable_ports
281                );
282                error!("There are some udp ports with no response!! Retrying...");
283            } else {
284                error!("Maximum retry count is reached....");
285                break 'outer;
286            }
287        }
288    }
289
290    ok
291}
292
293pub fn verify_reachable_ports(
294    ip_echo_server_addr: &SocketAddr,
295    tcp_listeners: Vec<(u16, TcpListener)>,
296    udp_sockets: &[&UdpSocket],
297) -> bool {
298    do_verify_reachable_ports(
299        ip_echo_server_addr,
300        tcp_listeners,
301        udp_sockets,
302        DEFAULT_TIMEOUT_SECS,
303        DEFAULT_RETRY_COUNT,
304    )
305}
306
307pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
308    if let Some(addrstr) = optstr {
309        if let Ok(port) = addrstr.parse() {
310            let mut addr = default_addr;
311            addr.set_port(port);
312            addr
313        } else if let Ok(addr) = addrstr.parse() {
314            addr
315        } else {
316            default_addr
317        }
318    } else {
319        default_addr
320    }
321}
322
323pub fn parse_port_range(port_range: &str) -> Option<PortRange> {
324    let ports: Vec<&str> = port_range.split('-').collect();
325    if ports.len() != 2 {
326        return None;
327    }
328
329    let start_port = ports[0].parse();
330    let end_port = ports[1].parse();
331
332    if start_port.is_err() || end_port.is_err() {
333        return None;
334    }
335    let start_port = start_port.unwrap();
336    let end_port = end_port.unwrap();
337    if end_port < start_port {
338        return None;
339    }
340    Some((start_port, end_port))
341}
342
343pub fn parse_host(host: &str) -> Result<IpAddr, String> {
344    // First, check if the host syntax is valid. This check is needed because addresses
345    // such as `("localhost:1234", 0)` will resolve to IPs on some networks.
346    let parsed_url = Url::parse(&format!("http://{host}")).map_err(|e| e.to_string())?;
347    if parsed_url.port().is_some() {
348        return Err(format!("Expected port in URL: {host}"));
349    }
350
351    // Next, check to see if it resolves to an IP address
352    let ips: Vec<_> = (host, 0)
353        .to_socket_addrs()
354        .map_err(|err| err.to_string())?
355        .map(|socket_address| socket_address.ip())
356        .collect();
357    if ips.is_empty() {
358        Err(format!("Unable to resolve host: {host}"))
359    } else {
360        Ok(ips[0])
361    }
362}
363
364pub fn is_host(string: String) -> Result<(), String> {
365    parse_host(&string).map(|_| ())
366}
367
368pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
369    let addrs: Vec<_> = host_port
370        .to_socket_addrs()
371        .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
372        .collect();
373    if addrs.is_empty() {
374        Err(format!("Unable to resolve host: {host_port}"))
375    } else {
376        Ok(addrs[0])
377    }
378}
379
380pub fn is_host_port(string: String) -> Result<(), String> {
381    parse_host_port(&string).map(|_| ())
382}
383
384#[cfg(any(windows, target_os = "ios"))]
385fn udp_socket(_reuseaddr: bool) -> io::Result<Socket> {
386    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
387    Ok(sock)
388}
389
390#[cfg(not(any(windows, target_os = "ios")))]
391fn udp_socket(reuseaddr: bool) -> io::Result<Socket> {
392    use {
393        nix::sys::socket::{
394            setsockopt,
395            sockopt::{ReuseAddr, ReusePort},
396        },
397        std::os::unix::io::AsRawFd,
398    };
399
400    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
401    let sock_fd = sock.as_raw_fd();
402
403    if reuseaddr {
404        // best effort, i.e. ignore errors here, we'll get the failure in caller
405        setsockopt(sock_fd, ReusePort, &true).ok();
406        setsockopt(sock_fd, ReuseAddr, &true).ok();
407    }
408
409    Ok(sock)
410}
411
412// Find a port in the given range that is available for both TCP and UDP
413pub fn bind_common_in_range(
414    ip_addr: IpAddr,
415    range: PortRange,
416) -> io::Result<(u16, (UdpSocket, TcpListener))> {
417    for port in range.0..range.1 {
418        if let Ok((sock, listener)) = bind_common(ip_addr, port, false) {
419            return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
420        }
421    }
422
423    Err(io::Error::new(
424        io::ErrorKind::Other,
425        format!("No available TCP/UDP ports in {range:?}"),
426    ))
427}
428
429pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
430    let sock = udp_socket(false)?;
431
432    for port in range.0..range.1 {
433        let addr = SocketAddr::new(ip_addr, port);
434
435        if sock.bind(&SockAddr::from(addr)).is_ok() {
436            let sock: UdpSocket = sock.into();
437            return Result::Ok((sock.local_addr().unwrap().port(), sock));
438        }
439    }
440
441    Err(io::Error::new(
442        io::ErrorKind::Other,
443        format!("No available UDP ports in {range:?}"),
444    ))
445}
446
447pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result<UdpSocket> {
448    let sock = udp_socket(false)?;
449    let addr = SocketAddr::new(ip_addr, 0);
450    match sock.bind(&SockAddr::from(addr)) {
451        Ok(_) => Result::Ok(sock.into()),
452        Err(err) => Err(io::Error::new(
453            io::ErrorKind::Other,
454            format!("No available UDP port: {err}"),
455        )),
456    }
457}
458
459// binds many sockets to the same port in a range
460pub fn multi_bind_in_range(
461    ip_addr: IpAddr,
462    range: PortRange,
463    mut num: usize,
464) -> io::Result<(u16, Vec<UdpSocket>)> {
465    if cfg!(windows) && num != 1 {
466        // See https://github.com/solana-labs/solana/issues/4607
467        warn!(
468            "multi_bind_in_range() only supports 1 socket in windows ({} requested)",
469            num
470        );
471        num = 1;
472    }
473    let mut sockets = Vec::with_capacity(num);
474
475    const NUM_TRIES: usize = 100;
476    let mut port = 0;
477    let mut error = None;
478    for _ in 0..NUM_TRIES {
479        port = {
480            let (port, _) = bind_in_range(ip_addr, range)?;
481            port
482        }; // drop the probe, port should be available... briefly.
483
484        for _ in 0..num {
485            let sock = bind_to(ip_addr, port, true);
486            if let Ok(sock) = sock {
487                sockets.push(sock);
488            } else {
489                error = Some(sock);
490                break;
491            }
492        }
493        if sockets.len() == num {
494            break;
495        } else {
496            sockets.clear();
497        }
498    }
499    if sockets.len() != num {
500        error.unwrap()?;
501    }
502    Ok((port, sockets))
503}
504
505pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result<UdpSocket> {
506    let sock = udp_socket(reuseaddr)?;
507
508    let addr = SocketAddr::new(ip_addr, port);
509
510    sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
511}
512
513// binds both a UdpSocket and a TcpListener
514pub fn bind_common(
515    ip_addr: IpAddr,
516    port: u16,
517    reuseaddr: bool,
518) -> io::Result<(UdpSocket, TcpListener)> {
519    let sock = udp_socket(reuseaddr)?;
520
521    let addr = SocketAddr::new(ip_addr, port);
522    let sock_addr = SockAddr::from(addr);
523    sock.bind(&sock_addr)
524        .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
525}
526
527pub fn bind_two_in_range_with_offset(
528    ip_addr: IpAddr,
529    range: PortRange,
530    offset: u16,
531) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
532    if range.1.saturating_sub(range.0) < offset {
533        return Err(io::Error::new(
534            io::ErrorKind::Other,
535            "range too small to find two ports with the correct offset".to_string(),
536        ));
537    }
538    for port in range.0..range.1 {
539        if let Ok(first_bind) = bind_to(ip_addr, port, false) {
540            if range.1.saturating_sub(port) >= offset {
541                if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) {
542                    return Ok((
543                        (first_bind.local_addr().unwrap().port(), first_bind),
544                        (second_bind.local_addr().unwrap().port(), second_bind),
545                    ));
546                }
547            } else {
548                break;
549            }
550        }
551    }
552    Err(io::Error::new(
553        io::ErrorKind::Other,
554        "couldn't find two ports with the correct offset in range".to_string(),
555    ))
556}
557
558pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<u16> {
559    let (start, end) = range;
560    let mut tries_left = end - start;
561    let mut rand_port = thread_rng().gen_range(start..end);
562    loop {
563        match bind_common(ip_addr, rand_port, false) {
564            Ok(_) => {
565                break Ok(rand_port);
566            }
567            Err(err) => {
568                if tries_left == 0 {
569                    return Err(err);
570                }
571            }
572        }
573        rand_port += 1;
574        if rand_port == end {
575            rand_port = start;
576        }
577        tries_left -= 1;
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use {super::*, std::net::Ipv4Addr};
584
585    #[test]
586    fn test_response_length() {
587        let resp = IpEchoServerResponse {
588            address: IpAddr::from([u16::MAX; 8]), // IPv6 variant
589            shred_version: Some(u16::MAX),
590        };
591        let resp_size = bincode::serialized_size(&resp).unwrap();
592        assert_eq!(
593            IP_ECHO_SERVER_RESPONSE_LENGTH,
594            HEADER_LENGTH + resp_size as usize
595        );
596    }
597
598    // Asserts that an old client can parse the response from a new server.
599    #[test]
600    fn test_backward_compat() {
601        let address = IpAddr::from([
602            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
603        ]);
604        let response = IpEchoServerResponse {
605            address,
606            shred_version: Some(42),
607        };
608        let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
609        bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
610        data.truncate(HEADER_LENGTH + 20);
611        assert_eq!(
612            bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
613            address
614        );
615    }
616
617    // Asserts that a new client can parse the response from an old server.
618    #[test]
619    fn test_forward_compat() {
620        let address = IpAddr::from([
621            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
622        ]);
623        let mut data = [0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
624        bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
625        let response: Result<IpEchoServerResponse, _> =
626            bincode::deserialize(&data[HEADER_LENGTH..]);
627        assert_eq!(
628            response.unwrap(),
629            IpEchoServerResponse {
630                address,
631                shred_version: None,
632            }
633        );
634    }
635
636    #[test]
637    fn test_parse_port_or_addr() {
638        let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
639        assert_eq!(p1.port(), 9000);
640        let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), SocketAddr::from(([1, 2, 3, 4], 1)));
641        assert_eq!(p2.port(), 7000);
642        let p2 = parse_port_or_addr(Some("hi there"), SocketAddr::from(([1, 2, 3, 4], 1)));
643        assert_eq!(p2.port(), 1);
644        let p3 = parse_port_or_addr(None, SocketAddr::from(([1, 2, 3, 4], 1)));
645        assert_eq!(p3.port(), 1);
646    }
647
648    #[test]
649    fn test_parse_port_range() {
650        assert_eq!(parse_port_range("garbage"), None);
651        assert_eq!(parse_port_range("1-"), None);
652        assert_eq!(parse_port_range("1-2"), Some((1, 2)));
653        assert_eq!(parse_port_range("1-2-3"), None);
654        assert_eq!(parse_port_range("2-1"), None);
655    }
656
657    #[test]
658    fn test_parse_host() {
659        parse_host("localhost:1234").unwrap_err();
660        parse_host("localhost").unwrap();
661        parse_host("127.0.0.0:1234").unwrap_err();
662        parse_host("127.0.0.0").unwrap();
663    }
664
665    #[test]
666    fn test_parse_host_port() {
667        parse_host_port("localhost:1234").unwrap();
668        parse_host_port("localhost").unwrap_err();
669        parse_host_port("127.0.0.0:1234").unwrap();
670        parse_host_port("127.0.0.0").unwrap_err();
671    }
672
673    #[test]
674    fn test_is_host_port() {
675        assert!(is_host_port("localhost:1234".to_string()).is_ok());
676        assert!(is_host_port("localhost".to_string()).is_err());
677    }
678
679    #[test]
680    fn test_bind() {
681        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
682        assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000);
683        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
684        let x = bind_to(ip_addr, 2002, true).unwrap();
685        let y = bind_to(ip_addr, 2002, true).unwrap();
686        assert_eq!(
687            x.local_addr().unwrap().port(),
688            y.local_addr().unwrap().port()
689        );
690        bind_to(ip_addr, 2002, false).unwrap_err();
691        bind_in_range(ip_addr, (2002, 2003)).unwrap_err();
692
693        let (port, v) = multi_bind_in_range(ip_addr, (2010, 2110), 10).unwrap();
694        for sock in &v {
695            assert_eq!(port, sock.local_addr().unwrap().port());
696        }
697    }
698
699    #[test]
700    fn test_bind_with_any_port() {
701        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
702        let x = bind_with_any_port(ip_addr).unwrap();
703        let y = bind_with_any_port(ip_addr).unwrap();
704        assert_ne!(
705            x.local_addr().unwrap().port(),
706            y.local_addr().unwrap().port()
707        );
708    }
709
710    #[test]
711    fn test_bind_in_range_nil() {
712        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
713        bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
714        bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
715    }
716
717    #[test]
718    fn test_find_available_port_in_range() {
719        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
720        assert_eq!(
721            find_available_port_in_range(ip_addr, (3000, 3001)).unwrap(),
722            3000
723        );
724        let port = find_available_port_in_range(ip_addr, (3000, 3050)).unwrap();
725        assert!((3000..3050).contains(&port));
726
727        let _socket = bind_to(ip_addr, port, false).unwrap();
728        find_available_port_in_range(ip_addr, (port, port + 1)).unwrap_err();
729    }
730
731    #[test]
732    fn test_bind_common_in_range() {
733        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
734        let (port, _sockets) = bind_common_in_range(ip_addr, (3100, 3150)).unwrap();
735        assert!((3100..3150).contains(&port));
736
737        bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err();
738    }
739
740    #[test]
741    fn test_get_public_ip_addr_none() {
742        miraland_logger::setup();
743        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
744        let (_server_port, (server_udp_socket, server_tcp_listener)) =
745            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
746
747        let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(42));
748
749        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
750        assert_eq!(
751            get_public_ip_addr(&server_ip_echo_addr),
752            parse_host("127.0.0.1"),
753        );
754        assert_eq!(get_cluster_shred_version(&server_ip_echo_addr), Ok(42));
755        assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
756    }
757
758    #[test]
759    fn test_get_public_ip_addr_reachable() {
760        miraland_logger::setup();
761        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
762        let (_server_port, (server_udp_socket, server_tcp_listener)) =
763            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
764        let (client_port, (client_udp_socket, client_tcp_listener)) =
765            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
766
767        let _runtime = ip_echo_server(server_tcp_listener, /*shred_version=*/ Some(65535));
768
769        let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
770        assert_eq!(
771            get_public_ip_addr(&ip_echo_server_addr),
772            parse_host("127.0.0.1"),
773        );
774        assert_eq!(get_cluster_shred_version(&ip_echo_server_addr), Ok(65535));
775        assert!(verify_reachable_ports(
776            &ip_echo_server_addr,
777            vec![(client_port, client_tcp_listener)],
778            &[&client_udp_socket],
779        ));
780    }
781
782    #[test]
783    fn test_get_public_ip_addr_tcp_unreachable() {
784        miraland_logger::setup();
785        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
786        let (_server_port, (server_udp_socket, _server_tcp_listener)) =
787            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
788
789        // make the socket unreachable by not running the ip echo server!
790
791        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
792
793        let (correct_client_port, (_client_udp_socket, client_tcp_listener)) =
794            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
795
796        assert!(!do_verify_reachable_ports(
797            &server_ip_echo_addr,
798            vec![(correct_client_port, client_tcp_listener)],
799            &[],
800            2,
801            3,
802        ));
803    }
804
805    #[test]
806    fn test_get_public_ip_addr_udp_unreachable() {
807        miraland_logger::setup();
808        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
809        let (_server_port, (server_udp_socket, _server_tcp_listener)) =
810            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
811
812        // make the socket unreachable by not running the ip echo server!
813
814        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
815
816        let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
817            bind_common_in_range(ip_addr, (3200, 3250)).unwrap();
818
819        assert!(!do_verify_reachable_ports(
820            &server_ip_echo_addr,
821            vec![],
822            &[&client_udp_socket],
823            2,
824            3,
825        ));
826    }
827
828    #[test]
829    fn test_bind_two_in_range_with_offset() {
830        miraland_logger::setup();
831        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
832        let offset = 6;
833        if let Ok(((port1, _), (port2, _))) =
834            bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
835        {
836            assert!(port2 == port1 + offset);
837        }
838        let offset = 42;
839        if let Ok(((port1, _), (port2, _))) =
840            bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
841        {
842            assert!(port2 == port1 + offset);
843        }
844        assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err());
845    }
846}