deno_net/
tcp.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2use std::collections::HashMap;
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use socket2::Domain;
7use socket2::Protocol;
8use socket2::Type;
9
10/// Our per-process `Connections`. We can use this to find an existent listener for
11/// a given local address and clone its socket for us to listen on in our thread.
12static CONNS: std::sync::OnceLock<std::sync::Mutex<Connections>> =
13  std::sync::OnceLock::new();
14
15/// Maintains a map of listening address to `TcpConnection`.
16#[derive(Default)]
17struct Connections {
18  tcp: HashMap<SocketAddr, Arc<TcpConnection>>,
19}
20
21/// Holds an open listener. We clone the underlying file descriptor (unix) or socket handle (Windows)
22/// and then listen on our copy of it.
23pub struct TcpConnection {
24  /// The pristine FD that we'll clone for each LB listener
25  #[cfg(unix)]
26  sock: std::os::fd::OwnedFd,
27  #[cfg(not(unix))]
28  sock: std::os::windows::io::OwnedSocket,
29  key: SocketAddr,
30}
31
32impl TcpConnection {
33  /// Boot a load-balanced TCP connection
34  pub fn start(key: SocketAddr) -> std::io::Result<Self> {
35    let listener = bind_socket_and_listen(key, false)?;
36    let sock = listener.into();
37
38    Ok(Self { sock, key })
39  }
40
41  fn listener(&self) -> std::io::Result<tokio::net::TcpListener> {
42    let listener = std::net::TcpListener::from(self.sock.try_clone()?);
43    let listener = tokio::net::TcpListener::from_std(listener)?;
44    Ok(listener)
45  }
46}
47
48/// A TCP socket listener that optionally allows for round-robin load-balancing in-process.
49pub struct TcpListener {
50  listener: Option<tokio::net::TcpListener>,
51  conn: Option<Arc<TcpConnection>>,
52}
53
54/// Does this platform implement `SO_REUSEPORT` in a load-balancing manner?
55const REUSE_PORT_LOAD_BALANCES: bool =
56  cfg!(any(target_os = "android", target_os = "linux"));
57
58impl TcpListener {
59  /// Bind to a port. On Linux, or when we don't have `SO_REUSEPORT` set, we just bind the port directly.
60  /// On other platforms, we emulate `SO_REUSEPORT` by cloning the socket and having each clone race to
61  /// accept every connection.
62  ///
63  /// ## Why not `SO_REUSEPORT`?
64  ///
65  /// The `SO_REUSEPORT` socket option allows multiple sockets on the same host to bind to the same port. This is
66  /// particularly useful for load balancing or implementing high availability in server applications.
67  ///
68  /// On Linux, `SO_REUSEPORT` allows multiple sockets to bind to the same port, and the kernel will load
69  /// balance incoming connections among those sockets. Each socket can accept connections independently.
70  /// This is useful for scenarios where you want to distribute incoming connections among multiple processes
71  /// or threads.
72  ///
73  /// On macOS (which is based on BSD), the behaviour of `SO_REUSEPORT` is slightly different. When `SO_REUSEPORT` is set,
74  /// multiple sockets can still bind to the same port, but the kernel does not perform load balancing as it does on Linux.
75  /// Instead, it follows a "last bind wins" strategy. This means that the most recently bound socket will receive
76  /// incoming connections exclusively, while the previously bound sockets will not receive any connections.
77  /// This behaviour is less useful for load balancing compared to Linux, but it can still be valuable in certain scenarios.
78  pub fn bind(
79    socket_addr: SocketAddr,
80    reuse_port: bool,
81  ) -> std::io::Result<Self> {
82    if REUSE_PORT_LOAD_BALANCES && reuse_port {
83      Self::bind_load_balanced(socket_addr)
84    } else {
85      Self::bind_direct(socket_addr, reuse_port)
86    }
87  }
88
89  /// Bind directly to the port, passing `reuse_port` directly to the socket. On platforms other
90  /// than Linux, `reuse_port` does not do any load balancing.
91  pub fn bind_direct(
92    socket_addr: SocketAddr,
93    reuse_port: bool,
94  ) -> std::io::Result<Self> {
95    // We ignore `reuse_port` on platforms other than Linux to match the existing behaviour.
96    let listener = bind_socket_and_listen(socket_addr, reuse_port)?;
97    Ok(Self {
98      listener: Some(tokio::net::TcpListener::from_std(listener)?),
99      conn: None,
100    })
101  }
102
103  /// Bind to the port in a load-balanced manner.
104  pub fn bind_load_balanced(socket_addr: SocketAddr) -> std::io::Result<Self> {
105    let tcp = &mut CONNS.get_or_init(Default::default).lock().unwrap().tcp;
106    if let Some(conn) = tcp.get(&socket_addr) {
107      let listener = Some(conn.listener()?);
108      return Ok(Self {
109        listener,
110        conn: Some(conn.clone()),
111      });
112    }
113    let conn = Arc::new(TcpConnection::start(socket_addr)?);
114    let listener = Some(conn.listener()?);
115    tcp.insert(socket_addr, conn.clone());
116    Ok(Self {
117      listener,
118      conn: Some(conn),
119    })
120  }
121
122  pub async fn accept(
123    &self,
124  ) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> {
125    let (tcp, addr) = self.listener.as_ref().unwrap().accept().await?;
126    Ok((tcp, addr))
127  }
128
129  pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
130    self.listener.as_ref().unwrap().local_addr()
131  }
132}
133
134impl Drop for TcpListener {
135  fn drop(&mut self) {
136    // If we're in load-balancing mode
137    if let Some(conn) = self.conn.take() {
138      let mut tcp = CONNS.get().unwrap().lock().unwrap();
139      if Arc::strong_count(&conn) == 2 {
140        tcp.tcp.remove(&conn.key);
141        // Close the connection
142        debug_assert_eq!(Arc::strong_count(&conn), 1);
143        drop(conn);
144      }
145    }
146  }
147}
148
149/// Bind a socket to an address and listen with the low-level options we need.
150#[allow(unused_variables)]
151fn bind_socket_and_listen(
152  socket_addr: SocketAddr,
153  reuse_port: bool,
154) -> Result<std::net::TcpListener, std::io::Error> {
155  let socket = if socket_addr.is_ipv4() {
156    socket2::Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?
157  } else {
158    socket2::Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?
159  };
160  #[cfg(not(windows))]
161  if REUSE_PORT_LOAD_BALANCES && reuse_port {
162    socket.set_reuse_port(true)?;
163  }
164  #[cfg(not(windows))]
165  // This is required for re-use of a port immediately after closing. There's a small
166  // security trade-off here but we err on the side of convenience.
167  //
168  // https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ
169  // https://stackoverflow.com/questions/26772549/is-it-a-good-idea-to-reuse-port-using-option-so-reuseaddr-which-is-already-in-ti
170  socket.set_reuse_address(true)?;
171  socket.set_nonblocking(true)?;
172  socket.bind(&socket_addr.into())?;
173  // Kernel will round it up to the next power of 2 + 1.
174  socket.listen(511)?;
175  let listener = socket.into();
176  Ok(listener)
177}