solana_net_utils/
sockets.rs

1#[cfg(feature = "dev-context-only-utils")]
2use tokio::net::UdpSocket as TokioUdpSocket;
3use {
4    crate::PortRange,
5    log::warn,
6    socket2::{Domain, SockAddr, Socket, Type},
7    std::{
8        io,
9        net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, UdpSocket},
10        ops::Range,
11        sync::atomic::{AtomicU16, Ordering},
12    },
13};
14// base port for deconflicted allocations
15pub(crate) const UNIQUE_ALLOC_BASE_PORT: u16 = 2000;
16// how much to allocate per individual process.
17// we expect to have at most 64 concurrent tests in CI at any moment on a given host.
18const SLICE_PER_PROCESS: u16 = (u16::MAX - UNIQUE_ALLOC_BASE_PORT) / 64;
19/// When running under nextest, this will try to provide
20/// a unique slice of port numbers (assuming no other nextest processes
21/// are running on the same host) based on NEXTEST_TEST_GLOBAL_SLOT variable
22/// The port ranges will be reused following nextest logic.
23///
24/// When running without nextest, this will only bump an atomic and eventually
25/// panic when it runs out of port numbers to assign.
26#[allow(clippy::arithmetic_side_effects)]
27pub fn unique_port_range_for_tests(size: u16) -> Range<u16> {
28    static SLICE: AtomicU16 = AtomicU16::new(0);
29    let offset = SLICE.fetch_add(size, Ordering::SeqCst);
30    let start = offset
31        + match std::env::var("NEXTEST_TEST_GLOBAL_SLOT") {
32            Ok(slot) => {
33                let slot: u16 = slot.parse().unwrap();
34                assert!(
35                    offset < SLICE_PER_PROCESS,
36                    "Overrunning into the port range of another test! Consider using fewer ports \
37                     per test."
38                );
39                UNIQUE_ALLOC_BASE_PORT + slot * SLICE_PER_PROCESS
40            }
41            Err(_) => UNIQUE_ALLOC_BASE_PORT,
42        };
43    assert!(start < u16::MAX - size, "Ran out of port numbers!");
44    start..start + size
45}
46
47/// Retrieve a free 25-port slice for unit tests
48///
49/// When running under nextest, this will try to provide
50/// a unique slice of port numbers (assuming no other nextest processes
51/// are running on the same host) based on NEXTEST_TEST_GLOBAL_SLOT variable
52/// The port ranges will be reused following nextest logic.
53///
54/// When running without nextest, this will only bump an atomic and eventually
55/// panic when it runs out of port numbers to assign.
56pub fn localhost_port_range_for_tests() -> (u16, u16) {
57    let pr = unique_port_range_for_tests(25);
58    (pr.start, pr.end)
59}
60
61/// Bind a `UdpSocket` to a unique port.
62pub fn bind_to_localhost_unique() -> io::Result<UdpSocket> {
63    bind_to(
64        IpAddr::V4(Ipv4Addr::LOCALHOST),
65        unique_port_range_for_tests(1).start,
66    )
67}
68
69pub fn bind_gossip_port_in_range(
70    gossip_addr: &SocketAddr,
71    port_range: PortRange,
72    bind_ip_addr: IpAddr,
73) -> (u16, (UdpSocket, TcpListener)) {
74    let config = SocketConfiguration::default();
75    if gossip_addr.port() != 0 {
76        (
77            gossip_addr.port(),
78            bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else(|e| {
79                panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
80            }),
81        )
82    } else {
83        bind_common_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind")
84    }
85}
86
87/// True on platforms that support advanced socket configuration
88pub(crate) const PLATFORM_SUPPORTS_SOCKET_CONFIGS: bool =
89    cfg!(not(any(windows, target_os = "ios")));
90
91#[derive(Clone, Copy, Debug, Default)]
92pub struct SocketConfiguration {
93    reuseport: bool, // controls SO_REUSEPORT, this is not intended to be set explicitly
94    recv_buffer_size: Option<usize>,
95    send_buffer_size: Option<usize>,
96    non_blocking: bool,
97}
98
99impl SocketConfiguration {
100    /// Sets the receive buffer size for the socket (no effect on windows/ios).
101    ///
102    /// **Note:** On Linux the kernel will double the value you specify.
103    /// For example, if you specify `16MB`, the kernel will configure the
104    /// socket to use `32MB`.
105    /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_RCVBUF
106    pub fn recv_buffer_size(mut self, size: usize) -> Self {
107        self.recv_buffer_size = Some(size);
108        self
109    }
110
111    /// Sets the send buffer size for the socket (no effect on windows/ios)
112    ///
113    /// **Note:** On Linux the kernel will double the value you specify.
114    /// For example, if you specify `16MB`, the kernel will configure the
115    /// socket to use `32MB`.
116    /// See: https://man7.org/linux/man-pages/man7/socket.7.html: SO_SNDBUF
117    pub fn send_buffer_size(mut self, size: usize) -> Self {
118        self.send_buffer_size = Some(size);
119        self
120    }
121
122    /// Configure the socket for non-blocking IO
123    pub fn set_non_blocking(mut self, non_blocking: bool) -> Self {
124        self.non_blocking = non_blocking;
125        self
126    }
127}
128
129#[allow(deprecated)]
130impl From<crate::SocketConfig> for SocketConfiguration {
131    fn from(value: crate::SocketConfig) -> Self {
132        Self {
133            reuseport: value.reuseport,
134            recv_buffer_size: value.recv_buffer_size,
135            send_buffer_size: value.send_buffer_size,
136            non_blocking: false,
137        }
138    }
139}
140
141#[cfg(any(windows, target_os = "ios"))]
142fn set_reuse_port<T>(_socket: &T) -> io::Result<()> {
143    Ok(())
144}
145
146/// Sets SO_REUSEPORT on platforms that support it.
147#[cfg(not(any(windows, target_os = "ios")))]
148fn set_reuse_port<T>(socket: &T) -> io::Result<()>
149where
150    T: std::os::fd::AsFd,
151{
152    use nix::sys::socket::{setsockopt, sockopt::ReusePort};
153    setsockopt(socket, ReusePort, &true).map_err(io::Error::from)
154}
155
156pub(crate) fn udp_socket_with_config(config: SocketConfiguration) -> io::Result<Socket> {
157    let SocketConfiguration {
158        reuseport,
159        recv_buffer_size,
160        send_buffer_size,
161        non_blocking,
162    } = config;
163    let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
164    if PLATFORM_SUPPORTS_SOCKET_CONFIGS {
165        // Set buffer sizes
166        if let Some(recv_buffer_size) = recv_buffer_size {
167            sock.set_recv_buffer_size(recv_buffer_size)?;
168        }
169        if let Some(send_buffer_size) = send_buffer_size {
170            sock.set_send_buffer_size(send_buffer_size)?;
171        }
172
173        if reuseport {
174            set_reuse_port(&sock)?;
175        }
176    }
177    sock.set_nonblocking(non_blocking)?;
178    Ok(sock)
179}
180
181/// Find a port in the given range with a socket config that is available for both TCP and UDP
182pub fn bind_common_in_range_with_config(
183    ip_addr: IpAddr,
184    range: PortRange,
185    config: SocketConfiguration,
186) -> io::Result<(u16, (UdpSocket, TcpListener))> {
187    for port in range.0..range.1 {
188        if let Ok((sock, listener)) = bind_common_with_config(ip_addr, port, config) {
189            return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener)));
190        }
191    }
192
193    Err(io::Error::other(format!(
194        "No available TCP/UDP ports in {range:?}"
195    )))
196}
197
198pub fn bind_in_range_with_config(
199    ip_addr: IpAddr,
200    range: PortRange,
201    config: SocketConfiguration,
202) -> io::Result<(u16, UdpSocket)> {
203    let socket = udp_socket_with_config(config)?;
204
205    for port in range.0..range.1 {
206        let addr = SocketAddr::new(ip_addr, port);
207
208        if socket.bind(&SockAddr::from(addr)).is_ok() {
209            let udp_socket: UdpSocket = socket.into();
210            return Result::Ok((udp_socket.local_addr().unwrap().port(), udp_socket));
211        }
212    }
213
214    Err(io::Error::other(format!(
215        "No available UDP ports in {range:?}"
216    )))
217}
218
219#[deprecated(since = "3.0.0", note = "Please bind to specific ports instead")]
220pub fn bind_with_any_port_with_config(
221    ip_addr: IpAddr,
222    config: SocketConfiguration,
223) -> io::Result<UdpSocket> {
224    let sock = udp_socket_with_config(config)?;
225    let addr = SocketAddr::new(ip_addr, 0);
226    let bind = sock.bind(&SockAddr::from(addr));
227    match bind {
228        Ok(_) => Result::Ok(sock.into()),
229        Err(err) => Err(io::Error::other(format!("No available UDP port: {err}"))),
230    }
231}
232
233/// binds num sockets to the same port in a range with config
234pub fn multi_bind_in_range_with_config(
235    ip_addr: IpAddr,
236    range: PortRange,
237    config: SocketConfiguration,
238    mut num: usize,
239) -> io::Result<(u16, Vec<UdpSocket>)> {
240    if !PLATFORM_SUPPORTS_SOCKET_CONFIGS && num != 1 {
241        // See https://github.com/solana-labs/solana/issues/4607
242        warn!(
243            "multi_bind_in_range_with_config() only supports 1 socket on this platform ({num} \
244             requested)"
245        );
246        num = 1;
247    }
248    let (port, socket) = bind_in_range_with_config(ip_addr, range, config)?;
249    let sockets = bind_more_with_config(socket, num, config)?;
250    Ok((port, sockets))
251}
252
253pub fn bind_to(ip_addr: IpAddr, port: u16) -> io::Result<UdpSocket> {
254    let config = SocketConfiguration {
255        ..Default::default()
256    };
257    bind_to_with_config(ip_addr, port, config)
258}
259
260#[cfg(feature = "dev-context-only-utils")]
261pub async fn bind_to_async(ip_addr: IpAddr, port: u16) -> io::Result<TokioUdpSocket> {
262    let config = SocketConfiguration {
263        non_blocking: true,
264        ..Default::default()
265    };
266    let socket = bind_to_with_config(ip_addr, port, config)?;
267    TokioUdpSocket::from_std(socket)
268}
269
270#[cfg(feature = "dev-context-only-utils")]
271pub async fn bind_to_localhost_async() -> io::Result<TokioUdpSocket> {
272    let port = unique_port_range_for_tests(1).start;
273    bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), port).await
274}
275
276#[cfg(feature = "dev-context-only-utils")]
277pub async fn bind_to_unspecified_async() -> io::Result<TokioUdpSocket> {
278    let port = unique_port_range_for_tests(1).start;
279    bind_to_async(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port).await
280}
281
282pub fn bind_to_with_config(
283    ip_addr: IpAddr,
284    port: u16,
285    config: SocketConfiguration,
286) -> io::Result<UdpSocket> {
287    let sock = udp_socket_with_config(config)?;
288
289    let addr = SocketAddr::new(ip_addr, port);
290
291    sock.bind(&SockAddr::from(addr)).map(|_| sock.into())
292}
293
294/// binds both a UdpSocket and a TcpListener on the same port
295pub fn bind_common_with_config(
296    ip_addr: IpAddr,
297    port: u16,
298    config: SocketConfiguration,
299) -> io::Result<(UdpSocket, TcpListener)> {
300    let sock = udp_socket_with_config(config)?;
301
302    let addr = SocketAddr::new(ip_addr, port);
303    let sock_addr = SockAddr::from(addr);
304    sock.bind(&sock_addr)
305        .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener)))
306}
307
308pub fn bind_two_in_range_with_offset_and_config(
309    ip_addr: IpAddr,
310    range: PortRange,
311    offset: u16,
312    sock1_config: SocketConfiguration,
313    sock2_config: SocketConfiguration,
314) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> {
315    if range.1.saturating_sub(range.0) < offset {
316        return Err(io::Error::other(
317            "range too small to find two ports with the correct offset".to_string(),
318        ));
319    }
320
321    let max_start_port = range.1.saturating_sub(offset);
322    for port in range.0..=max_start_port {
323        let first_bind_result = bind_to_with_config(ip_addr, port, sock1_config);
324        if let Ok(first_bind) = first_bind_result {
325            let second_port = port.saturating_add(offset);
326            let second_bind_result = bind_to_with_config(ip_addr, second_port, sock2_config);
327            if let Ok(second_bind) = second_bind_result {
328                return Ok((
329                    (first_bind.local_addr().unwrap().port(), first_bind),
330                    (second_bind.local_addr().unwrap().port(), second_bind),
331                ));
332            }
333        }
334    }
335    Err(io::Error::other(
336        "couldn't find two ports with the correct offset in range".to_string(),
337    ))
338}
339
340pub fn bind_more_with_config(
341    socket: UdpSocket,
342    num: usize,
343    mut config: SocketConfiguration,
344) -> io::Result<Vec<UdpSocket>> {
345    if !PLATFORM_SUPPORTS_SOCKET_CONFIGS {
346        if num > 1 {
347            warn!(
348                "bind_more_with_config() only supports 1 socket on this platform ({num} requested)"
349            );
350        }
351        Ok(vec![socket])
352    } else {
353        set_reuse_port(&socket)?;
354        config.reuseport = true;
355        let addr = socket.local_addr().unwrap();
356        let ip = addr.ip();
357        let port = addr.port();
358        std::iter::once(Ok(socket))
359            .chain((1..num).map(|_| bind_to_with_config(ip, port, config)))
360            .collect()
361    }
362}
363
364#[cfg(test)]
365#[allow(deprecated)]
366mod tests {
367    use {
368        super::*,
369        crate::{bind_in_range, sockets::localhost_port_range_for_tests},
370        std::net::Ipv4Addr,
371    };
372
373    #[test]
374    fn test_bind() {
375        let (pr_s, pr_e) = localhost_port_range_for_tests();
376        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
377        let config = SocketConfiguration::default();
378        let s = bind_in_range(ip_addr, (pr_s, pr_e)).unwrap();
379        assert_eq!(s.0, pr_s, "bind_in_range should use first available port");
380        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
381        let x = bind_to_with_config(ip_addr, pr_s + 1, config).unwrap();
382        let y = bind_more_with_config(x, 2, config).unwrap();
383        assert_eq!(
384            y[0].local_addr().unwrap().port(),
385            y[1].local_addr().unwrap().port()
386        );
387        bind_to_with_config(ip_addr, pr_s, SocketConfiguration::default()).unwrap_err();
388        bind_in_range(ip_addr, (pr_s, pr_s + 2)).unwrap_err();
389
390        let (port, v) =
391            multi_bind_in_range_with_config(ip_addr, (pr_s + 5, pr_e), config, 10).unwrap();
392        for sock in &v {
393            assert_eq!(port, sock.local_addr().unwrap().port());
394        }
395    }
396
397    #[test]
398    fn test_bind_with_any_port() {
399        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
400        let config = SocketConfiguration::default();
401        let x = bind_with_any_port_with_config(ip_addr, config).unwrap();
402        let y = bind_with_any_port_with_config(ip_addr, config).unwrap();
403        assert_ne!(
404            x.local_addr().unwrap().port(),
405            y.local_addr().unwrap().port()
406        );
407    }
408
409    #[test]
410    fn test_bind_in_range_nil() {
411        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
412        bind_in_range(ip_addr, (2000, 2000)).unwrap_err();
413        bind_in_range(ip_addr, (2000, 1999)).unwrap_err();
414    }
415
416    #[test]
417    fn test_bind_on_top() {
418        let config = SocketConfiguration::default();
419        let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST);
420        let port_range = localhost_port_range_for_tests();
421        let (_p, s) = bind_in_range_with_config(localhost, port_range, config).unwrap();
422        let _socks = bind_more_with_config(s, 8, config).unwrap();
423
424        let _socks2 = multi_bind_in_range_with_config(localhost, port_range, config, 8).unwrap();
425    }
426
427    #[test]
428    fn test_bind_common_in_range() {
429        let ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
430        let (pr_s, pr_e) = localhost_port_range_for_tests();
431        let config = SocketConfiguration::default();
432        let (port, _sockets) =
433            bind_common_in_range_with_config(ip_addr, (pr_s, pr_e), config).unwrap();
434        assert!((pr_s..pr_e).contains(&port));
435
436        bind_common_in_range_with_config(ip_addr, (port, port + 1), config).unwrap_err();
437    }
438
439    #[test]
440    fn test_bind_two_in_range_with_offset() {
441        solana_logger::setup();
442        let config = SocketConfiguration::default();
443        let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
444        let offset = 6;
445        if let Ok(((port1, _), (port2, _))) =
446            bind_two_in_range_with_offset_and_config(ip_addr, (1024, 65535), offset, config, config)
447        {
448            assert!(port2 == port1 + offset);
449        }
450        let offset = 42;
451        if let Ok(((port1, _), (port2, _))) =
452            bind_two_in_range_with_offset_and_config(ip_addr, (1024, 65535), offset, config, config)
453        {
454            assert!(port2 == port1 + offset);
455        }
456        assert!(bind_two_in_range_with_offset_and_config(
457            ip_addr,
458            (1024, 1044),
459            offset,
460            config,
461            config
462        )
463        .is_err());
464    }
465}