solana_net_utils/
lib.rs

1//! The `net_utils` module assists with networking
2mod ip_echo_client;
3mod ip_echo_server;
4pub mod sockets;
5
6#[cfg(feature = "dev-context-only-utils")]
7pub mod tooling_for_tests;
8
9pub use ip_echo_server::{
10    ip_echo_server, IpEchoServer, DEFAULT_IP_ECHO_SERVER_THREADS, MAX_PORT_COUNT_PER_MESSAGE,
11    MINIMUM_IP_ECHO_SERVER_THREADS,
12};
13#[cfg(feature = "dev-context-only-utils")]
14use tokio::net::UdpSocket as TokioUdpSocket;
15use {
16    ip_echo_client::{ip_echo_server_request, ip_echo_server_request_with_binding},
17    ip_echo_server::IpEchoServerMessage,
18    log::*,
19    rand::{thread_rng, Rng},
20    socket2::{Domain, SockAddr, Socket, Type},
21    std::{
22        io::{self},
23        net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, ToSocketAddrs, UdpSocket},
24    },
25    url::Url,
26};
27
28/// A data type representing a public Udp socket
29pub struct UdpSocketPair {
30    pub addr: SocketAddr,    // Public address of the socket
31    pub receiver: UdpSocket, // Locally bound socket that can receive from the public address
32    pub sender: UdpSocket,   // Locally bound socket to send via public address
33}
34
35pub type PortRange = (u16, u16);
36
37pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
38pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 17; // VALIDATOR_PORT_RANGE must be at least this wide
39
40pub(crate) const HEADER_LENGTH: usize = 4;
41pub(crate) const IP_ECHO_SERVER_RESPONSE_LENGTH: usize = HEADER_LENGTH + 23;
42
43/// Determine the public IP address of this machine by asking an ip_echo_server at the given
44/// address.
45pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, String> {
46    let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
47    let rt = tokio::runtime::Builder::new_current_thread()
48        .enable_all()
49        .build()
50        .map_err(|e| e.to_string())?;
51    let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
52    Ok(resp.address)
53}
54
55/// Determine the public IP address of this machine by asking an ip_echo_server at the given
56/// address. This function will bind to the provided bind_addreess.
57pub fn get_public_ip_addr_with_binding(
58    ip_echo_server_addr: &SocketAddr,
59    bind_address: IpAddr,
60) -> anyhow::Result<IpAddr> {
61    let fut = ip_echo_server_request_with_binding(
62        *ip_echo_server_addr,
63        IpEchoServerMessage::default(),
64        bind_address,
65    );
66    let rt = tokio::runtime::Builder::new_current_thread()
67        .enable_all()
68        .build()?;
69    let resp = rt.block_on(fut)?;
70    Ok(resp.address)
71}
72
73/// Retrieves cluster shred version from Entrypoint address provided.
74pub fn get_cluster_shred_version(ip_echo_server_addr: &SocketAddr) -> Result<u16, String> {
75    let fut = ip_echo_server_request(*ip_echo_server_addr, IpEchoServerMessage::default());
76    let rt = tokio::runtime::Builder::new_current_thread()
77        .enable_all()
78        .build()
79        .map_err(|e| e.to_string())?;
80    let resp = rt.block_on(fut).map_err(|e| e.to_string())?;
81    resp.shred_version
82        .ok_or_else(|| "IP echo server does not return a shred-version".to_owned())
83}
84
85/// Retrieves cluster shred version from Entrypoint address provided,
86/// binds client-side socket to the IP provided.
87pub fn get_cluster_shred_version_with_binding(
88    ip_echo_server_addr: &SocketAddr,
89    bind_address: IpAddr,
90) -> anyhow::Result<u16> {
91    let fut = ip_echo_server_request_with_binding(
92        *ip_echo_server_addr,
93        IpEchoServerMessage::default(),
94        bind_address,
95    );
96    let rt = tokio::runtime::Builder::new_current_thread()
97        .enable_all()
98        .build()?;
99    let resp = rt.block_on(fut)?;
100    resp.shred_version
101        .ok_or_else(|| anyhow::anyhow!("IP echo server does not return a shred-version"))
102}
103
104// Limit the maximum number of port verify threads to something reasonable
105// in case the port ranges provided are very large.
106const MAX_PORT_VERIFY_THREADS: usize = 64;
107
108/// Checks if all of the provided TCP/UDP ports are reachable by the machine at
109/// `ip_echo_server_addr`. Tests must complete within timeout provided.
110/// Tests will run concurrently when possible, using up to 64 threads for IO.
111/// This function assumes that all sockets are bound to the same IP, and will panic otherwise
112#[deprecated(
113    since = "2.2.0",
114    note = "use `verify_all_reachable_udp` and `verify_all_reachable_tcp` instead"
115)]
116pub fn verify_reachable_ports(
117    ip_echo_server_addr: &SocketAddr,
118    tcp_listeners: Vec<(u16, TcpListener)>,
119    udp_sockets: &[&UdpSocket],
120) -> bool {
121    verify_all_reachable_tcp(
122        ip_echo_server_addr,
123        tcp_listeners.into_iter().map(|(_, l)| l).collect(),
124    ) && verify_all_reachable_udp(ip_echo_server_addr, udp_sockets)
125}
126
127/// Checks if all of the provided UDP ports are reachable by the machine at
128/// `ip_echo_server_addr`. Tests must complete within timeout provided.
129/// Tests will run concurrently when possible, using up to 64 threads for IO.
130/// This function assumes that all sockets are bound to the same IP, and will panic otherwise
131pub fn verify_all_reachable_udp(
132    ip_echo_server_addr: &SocketAddr,
133    udp_sockets: &[&UdpSocket],
134) -> bool {
135    let rt = tokio::runtime::Builder::new_current_thread()
136        .enable_all()
137        .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
138        .build()
139        .expect("Tokio builder should be able to reliably create a current thread runtime");
140    let fut = ip_echo_client::verify_all_reachable_udp(
141        *ip_echo_server_addr,
142        udp_sockets,
143        ip_echo_client::TIMEOUT,
144        ip_echo_client::DEFAULT_RETRY_COUNT,
145    );
146    rt.block_on(fut)
147}
148
149/// Checks if all of the provided TCP ports are reachable by the machine at
150/// `ip_echo_server_addr`. Tests must complete within timeout provided.
151/// Tests will run concurrently when possible, using up to 64 threads for IO.
152/// This function assumes that all sockets are bound to the same IP, and will panic otherwise.
153pub fn verify_all_reachable_tcp(
154    ip_echo_server_addr: &SocketAddr,
155    tcp_listeners: Vec<TcpListener>,
156) -> bool {
157    let rt = tokio::runtime::Builder::new_current_thread()
158        .enable_all()
159        .max_blocking_threads(MAX_PORT_VERIFY_THREADS)
160        .build()
161        .expect("Tokio builder should be able to reliably create a current thread runtime");
162    let fut = ip_echo_client::verify_all_reachable_tcp(
163        *ip_echo_server_addr,
164        tcp_listeners,
165        ip_echo_client::TIMEOUT,
166    );
167    rt.block_on(fut)
168}
169
170pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
171    if let Some(addrstr) = optstr {
172        if let Ok(port) = addrstr.parse() {
173            let mut addr = default_addr;
174            addr.set_port(port);
175            addr
176        } else if let Ok(addr) = addrstr.parse() {
177            addr
178        } else {
179            default_addr
180        }
181    } else {
182        default_addr
183    }
184}
185
186pub fn parse_port_range(port_range: &str) -> Option<PortRange> {
187    let ports: Vec<&str> = port_range.split('-').collect();
188    if ports.len() != 2 {
189        return None;
190    }
191
192    let start_port = ports[0].parse();
193    let end_port = ports[1].parse();
194
195    if start_port.is_err() || end_port.is_err() {
196        return None;
197    }
198    let start_port = start_port.unwrap();
199    let end_port = end_port.unwrap();
200    if end_port < start_port {
201        return None;
202    }
203    Some((start_port, end_port))
204}
205
206pub fn parse_host(host: &str) -> Result<IpAddr, String> {
207    // First, check if the host syntax is valid. This check is needed because addresses
208    // such as `("localhost:1234", 0)` will resolve to IPs on some networks.
209    let parsed_url = Url::parse(&format!("http://{host}")).map_err(|e| e.to_string())?;
210    if parsed_url.port().is_some() {
211        return Err(format!("Expected port in URL: {host}"));
212    }
213
214    // Next, check to see if it resolves to an IP address
215    let ips: Vec<_> = (host, 0)
216        .to_socket_addrs()
217        .map_err(|err| err.to_string())?
218        .map(|socket_address| socket_address.ip())
219        .collect();
220    if ips.is_empty() {
221        Err(format!("Unable to resolve host: {host}"))
222    } else {
223        Ok(ips[0])
224    }
225}
226
227pub fn is_host(string: String) -> Result<(), String> {
228    parse_host(&string).map(|_| ())
229}
230
231pub fn parse_host_port(host_port: &str) -> Result<SocketAddr, String> {
232    let addrs: Vec<_> = host_port
233        .to_socket_addrs()
234        .map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
235        .collect();
236    if addrs.is_empty() {
237        Err(format!("Unable to resolve host: {host_port}"))
238    } else {
239        Ok(addrs[0])
240    }
241}
242
243pub fn is_host_port(string: String) -> Result<(), String> {
244    parse_host_port(&string).map(|_| ())
245}
246
247#[derive(Clone, Copy, Debug, Default)]
248pub struct SocketConfig {
249    reuseport: bool,
250    recv_buffer_size: Option<usize>,
251    send_buffer_size: Option<usize>,
252}
253
254impl SocketConfig {
255    pub fn reuseport(mut self, reuseport: bool) -> Self {
256        self.reuseport = reuseport;
257        self
258    }
259
260    /// Sets the receive buffer size for the socket (no effect on windows/ios).
261    ///
262    /// **Note:** On Linux the kernel will double the value you specify.
263    /// For example, if you specify `16MB`, the kernel will configure the
264    /// socket to use `32MB`.
265    /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_RCVBUF
266    pub fn recv_buffer_size(mut self, size: usize) -> Self {
267        self.recv_buffer_size = Some(size);
268        self
269    }
270
271    /// Sets the send buffer size for the socket (no effect on windows/ios)
272    ///
273    /// **Note:** On Linux the kernel will double the value you specify.
274    /// For example, if you specify `16MB`, the kernel will configure the
275    /// socket to use `32MB`.
276    /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_SNDBUF
277    pub fn send_buffer_size(mut self, size: usize) -> Self {
278        self.send_buffer_size = Some(size);
279        self
280    }
281}
282
283#[cfg(any(windows, target_os = "ios"))]
284fn udp_socket_with_config(_config: SocketConfig) -> io::Result<Socket> {
285    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
286    Ok(sock)
287}
288
289#[cfg(not(any(windows, target_os = "ios")))]
290fn udp_socket_with_config(config: SocketConfig) -> io::Result<Socket> {
291    use nix::sys::socket::{setsockopt, sockopt::ReusePort};
292    let SocketConfig {
293        reuseport,
294        recv_buffer_size,
295        send_buffer_size,
296    } = config;
297
298    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
299
300    // Set buffer sizes
301    if let Some(recv_buffer_size) = recv_buffer_size {
302        sock.set_recv_buffer_size(recv_buffer_size)?;
303    }
304
305    if let Some(send_buffer_size) = send_buffer_size {
306        sock.set_send_buffer_size(send_buffer_size)?;
307    }
308
309    if reuseport {
310        setsockopt(&sock, ReusePort, &true).ok();
311    }
312
313    Ok(sock)
314}
315
316// Find a port in the given range with a socket config that is available for both TCP and UDP
317pub fn bind_common_in_range_with_config(
318    ip_addr: IpAddr,
319    range: PortRange,
320    config: SocketConfig,
321) -> io::Result<(u16, (UdpSocket, TcpListener))> {
322    for port in range.0..range.1 {
323        if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config) {
324            return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
325        }
326    }
327
328    Err(io::Error::other(format!(
329        "No available TCP/UDP ports in {range:?}"
330    )))
331}
332
333// Find a port in the given range that is available for both TCP and UDP
334#[deprecated(
335    since = "2.2.0",
336    note = "use `bind_common_in_range_with_config` instead"
337)]
338pub fn bind_common_in_range(
339    ip_addr: IpAddr,
340    range: PortRange,
341) -> io::Result<(u16, (UdpSocket, TcpListener))> {
342    bind_common_in_range_with_config(ip_addr, range, SocketConfig::default())
343}
344
345pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> {
346    let config = SocketConfig::default();
347    bind_in_range_with_config(ip_addr, range, config)
348}
349
350pub fn bind_in_range_with_config(
351    ip_addr: IpAddr,
352    range: PortRange,
353    config: SocketConfig,
354) -> io::Result<(u16, UdpSocket)> {
355    let sock = udp_socket_with_config(config)?;
356
357    for port in range.0..range.1 {
358        let addr = SocketAddr::new(ip_addr, port);
359
360        if sock.bind(&SockAddr::from(addr)).is_ok() {
361            let sock: UdpSocket = sock.into();
362            return Result::Ok((sock.local_addr().unwrap().port(), sock));
363        }
364    }
365
366    Err(io::Error::other(format!(
367        "No available UDP ports in {range:?}"
368    )))
369}
370
371pub fn bind_with_any_port_with_config(
372    ip_addr: IpAddr,
373    config: SocketConfig,
374) -> io::Result<UdpSocket> {
375    let sock = udp_socket_with_config(config)?;
376    let addr = SocketAddr::new(ip_addr, 0);
377    match sock.bind(&SockAddr::from(addr)) {
378        Ok(_) => Result::Ok(sock.into()),
379        Err(err) => Err(io::Error::other(format!("No available UDP port: {err}"))),
380    }
381}
382
383#[deprecated(since = "2.2.0", note = "use `bind_with_any_port_with_config` instead")]
384pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result<UdpSocket> {
385    bind_with_any_port_with_config(ip_addr, SocketConfig::default())
386}
387
388/// binds num sockets to the same port in a range with config
389pub fn multi_bind_in_range_with_config(
390    ip_addr: IpAddr,
391    range: PortRange,
392    config: SocketConfig,
393    mut num: usize,
394) -> io::Result<(u16, Vec<UdpSocket>)> {
395    if !config.reuseport {
396        return Err(io::Error::new(
397            io::ErrorKind::InvalidInput,
398            "SocketConfig.reuseport must be true for multi_bind_in_range_with_config",
399        ));
400    }
401    if cfg!(windows) && num != 1 {
402        // See https://github.com/solana-labs/solana/issues/4607
403        warn!(
404            "multi_bind_in_range_with_config() only supports 1 socket in windows ({} requested)",
405            num
406        );
407        num = 1;
408    }
409    let mut sockets = Vec::with_capacity(num);
410
411    const NUM_TRIES: usize = 100;
412    let mut port = 0;
413    let mut error = None;
414    for _ in 0..NUM_TRIES {
415        port = {
416            let (port, _) = bind_in_range(ip_addr, range)?;
417            port
418        }; // drop the probe, port should be available... briefly.
419
420        for _ in 0..num {
421            let sock = bind_to_with_config(ip_addr, port, config);
422            if let Ok(sock) = sock {
423                sockets.push(sock);
424            } else {
425                error = Some(sock);
426                break;
427            }
428        }
429        if sockets.len() == num {
430            break;
431        } else {
432            sockets.clear();
433        }
434    }
435    if sockets.len() != num {
436        error.unwrap()?;
437    }
438    Ok((port, sockets))
439}
440
441// binds many sockets to the same port in a range
442// Note: The `mut` modifier for `num` is unused but kept for compatibility with the public API.
443#[deprecated(
444    since = "2.2.0",
445    note = "use `multi_bind_in_range_with_config` instead"
446)]
447#[allow(unused_mut)]
448pub fn multi_bind_in_range(
449    ip_addr: IpAddr,
450    range: PortRange,
451    mut num: usize,
452) -> io::Result<(u16, Vec<UdpSocket>)> {
453    let config = SocketConfig::default().reuseport(true);
454    multi_bind_in_range_with_config(ip_addr, range, config, num)
455}
456
457pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result<UdpSocket> {
458    let config = SocketConfig::default().reuseport(reuseport);
459    bind_to_with_config(ip_addr, port, config)
460}
461
462#[cfg(feature = "dev-context-only-utils")]
463pub async fn bind_to_async(
464    ip_addr: IpAddr,
465    port: u16,
466    reuseport: bool,
467) -> io::Result<TokioUdpSocket> {
468    let config = SocketConfig::default().reuseport(reuseport);
469    let socket = bind_to_with_config_non_blocking(ip_addr, port, config)?;
470    TokioUdpSocket::from_std(socket)
471}
472
473pub fn bind_to_localhost() -> io::Result<UdpSocket> {
474    bind_to(
475        IpAddr::V4(Ipv4Addr::LOCALHOST),
476        /*port:*/ 0,
477        /*reuseport:*/ false,
478    )
479}
480
481#[cfg(feature = "dev-context-only-utils")]
482pub async fn bind_to_localhost_async() -> io::Result<TokioUdpSocket> {
483    bind_to_async(
484        IpAddr::V4(Ipv4Addr::LOCALHOST),
485        /*port:*/ 0,
486        /*reuseport:*/ false,
487    )
488    .await
489}
490
491pub fn bind_to_unspecified() -> io::Result<UdpSocket> {
492    bind_to(
493        IpAddr::V4(Ipv4Addr::UNSPECIFIED),
494        /*port:*/ 0,
495        /*reuseport:*/ false,
496    )
497}
498
499#[cfg(feature = "dev-context-only-utils")]
500pub async fn bind_to_unspecified_async() -> io::Result<TokioUdpSocket> {
501    bind_to_async(
502        IpAddr::V4(Ipv4Addr::UNSPECIFIED),
503        /*port:*/ 0,
504        /*reuseport:*/ false,
505    )
506    .await
507}
508
509pub fn bind_to_with_config(
510    ip_addr: IpAddr,
511    port: u16,
512    config: SocketConfig,
513) -> io::Result<UdpSocket> {
514    let sock = udp_socket_with_config(config)?;
515
516    let addr = SocketAddr::new(ip_addr, port);
517
518    sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
519}
520
521pub fn bind_to_with_config_non_blocking(
522    ip_addr: IpAddr,
523    port: u16,
524    config: SocketConfig,
525) -> io::Result<UdpSocket> {
526    let sock = udp_socket_with_config(config)?;
527
528    let addr = SocketAddr::new(ip_addr, port);
529
530    sock.bind(&SockAddr::from(addr))?;
531    sock.set_nonblocking(true)?;
532    Ok(sock.into())
533}
534
535/// binds both a UdpSocket and a TcpListener
536pub fn bind_common(ip_addr: IpAddr, port: u16) -> io::Result<(UdpSocket, TcpListener)> {
537    let config = SocketConfig::default();
538    bind_common_with_config(ip_addr, port, config)
539}
540
541/// binds both a UdpSocket and a TcpListener on the same port
542pub fn bind_common_with_config(
543    ip_addr: IpAddr,
544    port: u16,
545    config: SocketConfig,
546) -> io::Result<(UdpSocket, TcpListener)> {
547    let sock = udp_socket_with_config(config)?;
548
549    let addr = SocketAddr::new(ip_addr, port);
550    let sock_addr = SockAddr::from(addr);
551    sock.bind(&sock_addr)
552        .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
553}
554
555pub fn bind_two_in_range_with_offset(
556    ip_addr: IpAddr,
557    range: PortRange,
558    offset: u16,
559) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
560    let sock1_config = SocketConfig::default();
561    let sock2_config = SocketConfig::default();
562    bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config)
563}
564
565pub fn bind_two_in_range_with_offset_and_config(
566    ip_addr: IpAddr,
567    range: PortRange,
568    offset: u16,
569    sock1_config: SocketConfig,
570    sock2_config: SocketConfig,
571) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
572    if range.1.saturating_sub(range.0) < offset {
573        return Err(io::Error::other(
574            "range too small to find two ports with the correct offset".to_string(),
575        ));
576    }
577    for port in range.0..range.1 {
578        if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config) {
579            if range.1.saturating_sub(port) >= offset {
580                if let Ok(second_bind) =
581                    bind_to_with_config(ip_addr, port.saturating_add(offset), sock2_config)
582                {
583                    return Ok((
584                        (first_bind.local_addr().unwrap().port(), first_bind),
585                        (second_bind.local_addr().unwrap().port(), second_bind),
586                    ));
587                }
588            } else {
589                break;
590            }
591        }
592    }
593    Err(io::Error::other(
594        "couldn't find two ports with the correct offset in range".to_string(),
595    ))
596}
597
598/// Searches for an open port on a given binding ip_addr in the provided range.
599///
600/// This will start at a random point in the range provided, and search sequenctially.
601/// If it can not find anything, an Error is returned.
602///
603/// Keep in mind this will not reserve the port for you, only find one that is empty.
604pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<u16> {
605    let [port] = find_available_ports_in_range(ip_addr, range)?;
606    Ok(port)
607}
608
609/// Searches for several ports on a given binding ip_addr in the provided range.
610///
611/// This will start at a random point in the range provided, and search sequencially.
612/// If it can not find anything, an Error is returned.
613pub fn find_available_ports_in_range<const N: usize>(
614    ip_addr: IpAddr,
615    range: PortRange,
616) -> io::Result<[u16; N]> {
617    let mut result = [0u16; N];
618    let range = range.0..range.1;
619    let mut next_port_to_try = range
620        .clone()
621        .cycle() // loop over the end of the range
622        .skip(thread_rng().gen_range(range.clone()) as usize) // skip to random position
623        .take(range.len()) // never take the same value twice
624        .peekable();
625    let mut num = 0;
626    while num < N {
627        let port_to_try = next_port_to_try.next().unwrap(); // this unwrap never fails since we exit earlier
628        match bind_common(ip_addr, port_to_try) {
629            Ok(_) => {
630                result[num] = port_to_try;
631                num = num.saturating_add(1);
632            }
633            Err(err) => {
634                if next_port_to_try.peek().is_none() {
635                    return Err(err);
636                }
637            }
638        }
639    }
640    Ok(result)
641}
642
643pub fn bind_more_with_config(
644    socket: UdpSocket,
645    num: usize,
646    config: SocketConfig,
647) -> io::Result<Vec<UdpSocket>> {
648    let addr = socket.local_addr().unwrap();
649    let ip = addr.ip();
650    let port = addr.port();
651    std::iter::once(Ok(socket))
652        .chain((1..num).map(|_| bind_to_with_config(ip, port, config)))
653        .collect()
654}
655
656#[cfg(test)]
657mod tests {
658    use {
659        super::*,
660        ip_echo_server::IpEchoServerResponse,
661        itertools::Itertools,
662        std::{net::Ipv4Addr, time::Duration},
663        tokio::runtime::Runtime,
664    };
665
666    fn runtime() -> Runtime {
667        tokio::runtime::Builder::new_current_thread()
668            .enable_all()
669            .build()
670            .expect("Can not create a runtime")
671    }
672    #[test]
673    fn test_response_length() {
674        let resp = IpEchoServerResponse {
675            address: IpAddr::from([u16::MAX; 8]), // IPv6 variant
676            shred_version: Some(u16::MAX),
677        };
678        let resp_size = bincode::serialized_size(&resp).unwrap();
679        assert_eq!(
680            IP_ECHO_SERVER_RESPONSE_LENGTH,
681            HEADER_LENGTH + resp_size as usize
682        );
683    }
684
685    // Asserts that an old client can parse the response from a new server.
686    #[test]
687    fn test_backward_compat() {
688        let address = IpAddr::from([
689            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
690        ]);
691        let response = IpEchoServerResponse {
692            address,
693            shred_version: Some(42),
694        };
695        let mut data = vec![0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
696        bincode::serialize_into(&mut data[HEADER_LENGTH..], &response).unwrap();
697        data.truncate(HEADER_LENGTH + 20);
698        assert_eq!(
699            bincode::deserialize::<IpAddr>(&data[HEADER_LENGTH..]).unwrap(),
700            address
701        );
702    }
703
704    // Asserts that a new client can parse the response from an old server.
705    #[test]
706    fn test_forward_compat() {
707        let address = IpAddr::from([
708            525u16, 524u16, 523u16, 522u16, 521u16, 520u16, 519u16, 518u16,
709        ]);
710        let mut data = [0u8; IP_ECHO_SERVER_RESPONSE_LENGTH];
711        bincode::serialize_into(&mut data[HEADER_LENGTH..], &address).unwrap();
712        let response: Result<IpEchoServerResponse, _> =
713            bincode::deserialize(&data[HEADER_LENGTH..]);
714        assert_eq!(
715            response.unwrap(),
716            IpEchoServerResponse {
717                address,
718                shred_version: None,
719            }
720        );
721    }
722
723    #[test]
724    fn test_parse_port_or_addr() {
725        let p1 = parse_port_or_addr(Some("9000"), SocketAddr::from(([1, 2, 3, 4], 1)));
726        assert_eq!(p1.port(), 9000);
727        let p2 = parse_port_or_addr(Some("127.0.0.1:7000"), SocketAddr::from(([1, 2, 3, 4], 1)));
728        assert_eq!(p2.port(), 7000);
729        let p2 = parse_port_or_addr(Some("hi there"), SocketAddr::from(([1, 2, 3, 4], 1)));
730        assert_eq!(p2.port(), 1);
731        let p3 = parse_port_or_addr(None, SocketAddr::from(([1, 2, 3, 4], 1)));
732        assert_eq!(p3.port(), 1);
733    }
734
735    #[test]
736    fn test_parse_port_range() {
737        assert_eq!(parse_port_range("garbage"), None);
738        assert_eq!(parse_port_range("1-"), None);
739        assert_eq!(parse_port_range("1-2"), Some((1, 2)));
740        assert_eq!(parse_port_range("1-2-3"), None);
741        assert_eq!(parse_port_range("2-1"), None);
742    }
743
744    #[test]
745    fn test_parse_host() {
746        parse_host("localhost:1234").unwrap_err();
747        parse_host("localhost").unwrap();
748        parse_host("127.0.0.0:1234").unwrap_err();
749        parse_host("127.0.0.0").unwrap();
750    }
751
752    #[test]
753    fn test_parse_host_port() {
754        parse_host_port("localhost:1234").unwrap();
755        parse_host_port("localhost").unwrap_err();
756        parse_host_port("127.0.0.0:1234").unwrap();
757        parse_host_port("127.0.0.0").unwrap_err();
758    }
759
760    #[test]
761    fn test_is_host_port() {
762        assert!(is_host_port("localhost:1234".to_string()).is_ok());
763        assert!(is_host_port("localhost".to_string()).is_err());
764    }
765
766    #[test]
767    fn test_bind() {
768        let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
769        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
770        let s = bind_in_range(ip_addr, (pr_s, pr_e)).unwrap();
771        assert_eq!(s.0, pr_s, "bind_in_range should use first available port");
772        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
773        let config = SocketConfig::default().reuseport(true);
774        let x = bind_to_with_config(ip_addr, pr_s + 1, config).unwrap();
775        let y = bind_to_with_config(ip_addr, pr_s + 1, config).unwrap();
776        assert_eq!(
777            x.local_addr().unwrap().port(),
778            y.local_addr().unwrap().port()
779        );
780        bind_to(ip_addr, pr_s, false).unwrap_err();
781        bind_in_range(ip_addr, (pr_s, pr_s + 2)).unwrap_err();
782
783        let (port, v) =
784            multi_bind_in_range_with_config(ip_addr, (pr_s + 5, pr_e), config, 10).unwrap();
785        for sock in &v {
786            assert_eq!(port, sock.local_addr().unwrap().port());
787        }
788    }
789
790    #[test]
791    fn test_bind_with_any_port() {
792        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
793        let config = SocketConfig::default();
794        let x = bind_with_any_port_with_config(ip_addr, config).unwrap();
795        let y = bind_with_any_port_with_config(ip_addr, config).unwrap();
796        assert_ne!(
797            x.local_addr().unwrap().port(),
798            y.local_addr().unwrap().port()
799        );
800    }
801
802    #[test]
803    fn test_bind_in_range_nil() {
804        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
805        bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
806        bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
807    }
808
809    #[test]
810    fn test_find_available_port_in_range() {
811        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
812        let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
813        assert_eq!(
814            find_available_port_in_range(ip_addr, (pr_s, pr_s + 1)).unwrap(),
815            pr_s
816        );
817        let port = find_available_port_in_range(ip_addr, (pr_s, pr_e)).unwrap();
818        assert!((pr_s..pr_e).contains(&port));
819
820        let _socket = bind_to(ip_addr, port, false).unwrap();
821        find_available_port_in_range(ip_addr, (port, port + 1)).unwrap_err();
822    }
823
824    #[test]
825    fn test_find_available_ports_in_range() {
826        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
827        let port_range = sockets::localhost_port_range_for_tests();
828        assert!(port_range.1 - port_range.0 > 16);
829        // reserve 1 port to make it non-trivial
830        let sock = bind_to_with_config(ip_addr, port_range.0 + 2, SocketConfig::default()).unwrap();
831        let ports: [u16; 15] = find_available_ports_in_range(ip_addr, port_range).unwrap();
832        let mut ports_vec = Vec::from(ports);
833        ports_vec.push(sock.local_addr().unwrap().port());
834        let res: Vec<_> = ports_vec.into_iter().unique().collect();
835        assert_eq!(res.len(), 16, "Should reserve 16 unique ports");
836    }
837
838    #[test]
839    fn test_bind_common_in_range() {
840        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
841        let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
842        let config = SocketConfig::default();
843        let (port, _sockets) =
844            bind_common_in_range_with_config(ip_addr, (pr_s, pr_e), config).unwrap();
845        assert!((pr_s..pr_e).contains(&port));
846
847        bind_common_in_range_with_config(ip_addr, (port, port + 1), config).unwrap_err();
848    }
849
850    #[test]
851    fn test_get_public_ip_addr_none() {
852        solana_logger::setup();
853        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
854        let (pr_s, pr_e) = sockets::localhost_port_range_for_tests();
855        let config = SocketConfig::default();
856        let (_server_port, (server_udp_socket, server_tcp_listener)) =
857            bind_common_in_range_with_config(ip_addr, (pr_s, pr_e), config).unwrap();
858
859        let _runtime = ip_echo_server(
860            server_tcp_listener,
861            DEFAULT_IP_ECHO_SERVER_THREADS,
862            /*shred_version=*/ Some(42),
863        );
864
865        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
866        assert_eq!(
867            get_public_ip_addr(&server_ip_echo_addr).unwrap(),
868            parse_host("127.0.0.1").unwrap(),
869        );
870        assert_eq!(get_cluster_shred_version(&server_ip_echo_addr).unwrap(), 42);
871        assert!(verify_all_reachable_tcp(&server_ip_echo_addr, vec![],));
872        assert!(verify_all_reachable_udp(&server_ip_echo_addr, &[],));
873    }
874
875    #[test]
876    fn test_get_public_ip_addr_reachable() {
877        solana_logger::setup();
878        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
879        let port_range = sockets::localhost_port_range_for_tests();
880        let config = SocketConfig::default();
881        let (_server_port, (server_udp_socket, server_tcp_listener)) =
882            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
883        let (_client_port, (client_udp_socket, client_tcp_listener)) =
884            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
885
886        let _runtime = ip_echo_server(
887            server_tcp_listener,
888            DEFAULT_IP_ECHO_SERVER_THREADS,
889            /*shred_version=*/ Some(65535),
890        );
891
892        let ip_echo_server_addr = server_udp_socket.local_addr().unwrap();
893        assert_eq!(
894            get_public_ip_addr(&ip_echo_server_addr).unwrap(),
895            parse_host("127.0.0.1").unwrap(),
896        );
897        assert_eq!(
898            get_cluster_shred_version(&ip_echo_server_addr).unwrap(),
899            65535
900        );
901        assert!(verify_all_reachable_tcp(
902            &ip_echo_server_addr,
903            vec![client_tcp_listener],
904        ));
905        assert!(verify_all_reachable_udp(
906            &ip_echo_server_addr,
907            &[&client_udp_socket],
908        ));
909    }
910
911    #[test]
912    fn test_verify_ports_tcp_unreachable() {
913        solana_logger::setup();
914        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
915        let port_range = sockets::localhost_port_range_for_tests();
916        let config = SocketConfig::default();
917        let (_server_port, (server_udp_socket, _server_tcp_listener)) =
918            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
919
920        // make the socket unreachable by not running the ip echo server!
921        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
922
923        let (_, (_client_udp_socket, client_tcp_listener)) =
924            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
925
926        let rt = runtime();
927        assert!(!rt.block_on(ip_echo_client::verify_all_reachable_tcp(
928            server_ip_echo_addr,
929            vec![client_tcp_listener],
930            Duration::from_secs(2),
931        )));
932    }
933
934    #[test]
935    fn test_verify_ports_udp_unreachable() {
936        solana_logger::setup();
937        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
938        let port_range = sockets::localhost_port_range_for_tests();
939        let config = SocketConfig::default();
940        let (_server_port, (server_udp_socket, _server_tcp_listener)) =
941            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
942
943        // make the socket unreachable by not running the ip echo server!
944        let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
945
946        let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
947            bind_common_in_range_with_config(ip_addr, port_range, config).unwrap();
948
949        let rt = runtime();
950        assert!(!rt.block_on(ip_echo_client::verify_all_reachable_udp(
951            server_ip_echo_addr,
952            &[&client_udp_socket],
953            Duration::from_secs(2),
954            3,
955        )));
956    }
957
958    #[test]
959    fn test_verify_many_ports_reachable() {
960        solana_logger::setup();
961        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
962        let config = SocketConfig::default();
963        let mut tcp_listeners = vec![];
964        let mut udp_sockets = vec![];
965
966        let (_server_port, (_, server_tcp_listener)) =
967            bind_common_in_range_with_config(ip_addr, (2200, 2300), config).unwrap();
968        for _ in 0..MAX_PORT_VERIFY_THREADS * 2 {
969            let (_client_port, (client_udp_socket, client_tcp_listener)) =
970                bind_common_in_range_with_config(
971                    ip_addr,
972                    (2300, 2300 + (MAX_PORT_VERIFY_THREADS * 3) as u16),
973                    config,
974                )
975                .unwrap();
976            tcp_listeners.push(client_tcp_listener);
977            udp_sockets.push(client_udp_socket);
978        }
979
980        let ip_echo_server_addr = server_tcp_listener.local_addr().unwrap();
981
982        let _runtime = ip_echo_server(
983            server_tcp_listener,
984            DEFAULT_IP_ECHO_SERVER_THREADS,
985            Some(65535),
986        );
987
988        assert_eq!(
989            get_public_ip_addr(&ip_echo_server_addr).unwrap(),
990            parse_host("127.0.0.1").unwrap(),
991        );
992
993        let socket_refs = udp_sockets.iter().collect_vec();
994        assert!(verify_all_reachable_tcp(
995            &ip_echo_server_addr,
996            tcp_listeners,
997        ));
998        assert!(verify_all_reachable_udp(&ip_echo_server_addr, &socket_refs));
999    }
1000
1001    #[test]
1002    fn test_bind_two_in_range_with_offset() {
1003        solana_logger::setup();
1004        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
1005        let offset = 6;
1006        if let Ok(((port1, _), (port2, _))) =
1007            bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
1008        {
1009            assert!(port2 == port1 + offset);
1010        }
1011        let offset = 42;
1012        if let Ok(((port1, _), (port2, _))) =
1013            bind_two_in_range_with_offset(ip_addr, (1024, 65535), offset)
1014        {
1015            assert!(port2 == port1 + offset);
1016        }
1017        assert!(bind_two_in_range_with_offset(ip_addr, (1024, 1044), offset).is_err());
1018    }
1019
1020    #[test]
1021    fn test_multi_bind_in_range_with_config_reuseport_disabled() {
1022        let ip_addr: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
1023        let config = SocketConfig::default(); //reuseport is false by default
1024
1025        let result = multi_bind_in_range_with_config(ip_addr, (2010, 2110), config, 2);
1026
1027        assert!(
1028            result.is_err(),
1029            "Expected an error when reuseport is not set to true"
1030        );
1031    }
1032}