Skip to main content

tako_rs_server/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! Server bootstrap for the Tako framework.
4//!
5//! Hosts every concrete listener implementation (HTTP/1, TLS, HTTP/3, raw TCP,
6//! UDP, Unix sockets, plus the compio variants) and the PROXY protocol parser.
7//! Re-exported under the original `tako::*` paths via the umbrella crate.
8
9use std::io::ErrorKind;
10use std::io::Write;
11use std::io::{self};
12use std::net::SocketAddr;
13use std::str::FromStr;
14use std::time::Duration;
15
16/// Selectable QUIC congestion controller. Mirrors the controllers shipped by
17/// `quinn::congestion`. Exposed here so HTTP/3 deployments can pick a profile
18/// without depending on quinn directly from the application crate.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum H3Congestion {
21  /// CUBIC — quinn's default and the most widely deployed.
22  #[default]
23  Cubic,
24  /// `NewReno` — older, conservative.
25  NewReno,
26  /// BBR — Google's bandwidth-delay-product controller; useful on
27  /// high-bandwidth, lossy links.
28  Bbr,
29}
30
31/// Production-readiness knobs shared by every Tako server transport.
32///
33/// `Default` mirrors the historical hardcoded values (30 s drain, 30 s header
34/// read, 100 H2 streams, …) so existing call sites keep their behavior. Pass
35/// a populated `ServerConfig` to `*_with_config` entry points to override
36/// individual knobs.
37#[derive(Debug, Clone)]
38pub struct ServerConfig {
39  /// Maximum time the coordinator waits for in-flight connections to finish
40  /// after a shutdown signal. After this elapses, remaining tasks are aborted.
41  pub drain_timeout: Duration,
42  /// Maximum time hyper waits for the request line + headers to arrive.
43  /// `None` disables the timeout (the previous behavior).
44  pub header_read_timeout: Option<Duration>,
45  /// HTTP/1 keep-alive (default `true`).
46  pub keep_alive: bool,
47  /// HTTP/1 keep-alive idle timeout (Hyper default applies if `None`).
48  pub keep_alive_timeout: Option<Duration>,
49  /// HTTP/2 `SETTINGS_MAX_CONCURRENT_STREAMS` cap.
50  pub h2_max_concurrent_streams: u32,
51  /// HTTP/2 `SETTINGS_MAX_HEADER_LIST_SIZE` cap (bytes).
52  pub h2_max_header_list_size: u32,
53  /// HTTP/2 send-buffer cap per stream (bytes).
54  pub h2_max_send_buf_size: usize,
55  /// HTTP/2 pending-accept `RST_STREAM` cap (CVE-2023-44487 mitigation).
56  pub h2_max_pending_accept_reset_streams: usize,
57  /// HTTP/2 keep-alive ping interval. `None` disables.
58  pub h2_keep_alive_interval: Option<Duration>,
59  /// HTTP/3 cap on concurrent client-initiated bidirectional streams. Maps to
60  /// `quinn::TransportConfig::max_concurrent_bidi_streams`.
61  pub h3_max_concurrent_bidi_streams: u32,
62  /// HTTP/3 cap on concurrent client-initiated unidirectional streams. Maps to
63  /// `quinn::TransportConfig::max_concurrent_uni_streams`.
64  pub h3_max_concurrent_uni_streams: u32,
65  /// HTTP/3 idle-timeout (no QUIC packets in either direction). `None` lets
66  /// quinn pick its default; `Some(d)` caps the connection lifetime.
67  pub h3_max_idle_timeout: Option<Duration>,
68  /// HTTP/3 congestion controller selection.
69  pub h3_congestion: H3Congestion,
70  /// Enable QUIC datagrams (RFC 9221) on HTTP/3 connections. Required for
71  /// downstream WebTransport-style traffic.
72  pub h3_enable_datagrams: bool,
73  /// Issue a QUIC Retry packet for each new connection whose source address
74  /// has not been validated. Mitigates UDP source-address-spoofing
75  /// amplification attacks at the cost of one extra round-trip per new client.
76  pub h3_use_retry: bool,
77  /// Per-connection grace given to in-flight HTTP/3 streams to finish after
78  /// the per-connection GOAWAY.
79  ///
80  /// The effective grace at runtime is `min(h3_goaway_grace, drain_timeout)`
81  /// — the server clamps this so a long per-connection grace cannot push the
82  /// total shutdown past the global drain budget. Configuring
83  /// `h3_goaway_grace` larger than `drain_timeout` is therefore a no-op
84  /// beyond the global ceiling.
85  pub h3_goaway_grace: Duration,
86  /// Optional ceiling on concurrent in-flight connections. Enforced via a
87  /// semaphore in the accept loop; `None` disables.
88  pub max_connections: Option<usize>,
89  /// Read deadline applied before the PROXY protocol header is parsed.
90  pub proxy_read_timeout: Duration,
91  /// Maximum time the TLS acceptor waits for the client to complete its
92  /// handshake. A slow / stalled handshake holds a `max_connections` permit
93  /// open indefinitely otherwise — TLS slowloris. Default 10 seconds.
94  pub tls_handshake_timeout: Duration,
95  /// Backoff schedule for `accept()` errors (typically EMFILE/ENFILE).
96  pub accept_backoff: AcceptBackoff,
97}
98
99impl Default for ServerConfig {
100  fn default() -> Self {
101    Self {
102      drain_timeout: Duration::from_secs(30),
103      header_read_timeout: Some(Duration::from_secs(30)),
104      keep_alive: true,
105      keep_alive_timeout: None,
106      h2_max_concurrent_streams: 100,
107      h2_max_header_list_size: 16 * 1024,
108      h2_max_send_buf_size: 1024 * 1024,
109      h2_max_pending_accept_reset_streams: 50,
110      h2_keep_alive_interval: None,
111      h3_max_concurrent_bidi_streams: 100,
112      h3_max_concurrent_uni_streams: 8,
113      h3_max_idle_timeout: Some(Duration::from_secs(30)),
114      h3_congestion: H3Congestion::default(),
115      h3_enable_datagrams: false,
116      h3_use_retry: false,
117      h3_goaway_grace: Duration::from_secs(10),
118      max_connections: None,
119      proxy_read_timeout: Duration::from_secs(10),
120      tls_handshake_timeout: Duration::from_secs(10),
121      accept_backoff: AcceptBackoff::new(),
122    }
123  }
124}
125
126/// Exponential backoff state for `listener.accept()` retry loops.
127///
128/// Accept errors (typically `EMFILE`/`ENFILE` when the process has run out of
129/// file descriptors, or transient `ConnectionAborted` under load) are not fatal
130/// to the listener. Servers should log, sleep, and re-poll. Use [`AcceptBackoff`]
131/// to keep the sleep schedule consistent across transports without duplicating
132/// the constants in every `serve_*` implementation.
133#[derive(Debug, Clone, Copy)]
134pub struct AcceptBackoff {
135  current: Duration,
136  max: Duration,
137}
138
139impl Default for AcceptBackoff {
140  fn default() -> Self {
141    Self::new()
142  }
143}
144
145impl AcceptBackoff {
146  /// Construct with the default 5 ms → 1 s schedule.
147  #[must_use]
148  pub const fn new() -> Self {
149    Self {
150      current: Duration::from_millis(5),
151      max: Duration::from_secs(1),
152    }
153  }
154
155  /// Reset the schedule after a successful accept.
156  #[inline]
157  pub fn reset(&mut self) {
158    self.current = Duration::from_millis(5);
159  }
160
161  /// Sleep for the current backoff and double it (capped at `max`).
162  /// Use the tokio `sleep` so this is cooperative on the runtime that runs
163  /// the accept loop.
164  pub async fn sleep_and_grow(&mut self) {
165    let d = self.current_and_grow();
166    tokio::time::sleep(d).await;
167  }
168
169  /// Returns the current backoff duration and doubles the internal counter
170  /// (capped at `max`). Use this when you need to drive the sleep with a
171  /// non-tokio timer (e.g. `compio::time::sleep`).
172  pub fn current_and_grow(&mut self) -> Duration {
173    let d = self.current;
174    self.current = (self.current * 2).min(self.max);
175    d
176  }
177}
178
179#[cfg(not(feature = "compio"))]
180mod server;
181
182mod builder;
183#[cfg(feature = "tls")]
184pub use builder::ClientAuth;
185#[cfg(feature = "compio")]
186pub use builder::CompioServer;
187#[cfg(feature = "compio")]
188pub use builder::CompioServerBuilder;
189#[cfg(feature = "tls")]
190pub use builder::ReloadableResolver;
191#[cfg(not(feature = "compio"))]
192pub use builder::Server;
193#[cfg(not(feature = "compio"))]
194pub use builder::ServerBuilder;
195pub use builder::ServerHandle;
196pub use builder::TlsCert;
197#[cfg(feature = "tls")]
198pub use builder::build_rustls_server_config;
199pub use builder::either;
200#[cfg(not(feature = "compio"))]
201pub use server::serve;
202#[cfg(not(feature = "compio"))]
203pub use server::serve_with_config;
204#[cfg(not(feature = "compio"))]
205pub use server::serve_with_shutdown;
206#[cfg(not(feature = "compio"))]
207pub use server::serve_with_shutdown_and_config;
208
209#[cfg(feature = "compio")]
210#[cfg_attr(docsrs, doc(cfg(feature = "compio")))]
211pub mod server_compio;
212#[cfg(feature = "compio")]
213pub use server_compio::serve;
214#[cfg(feature = "compio")]
215pub use server_compio::serve_with_config;
216#[cfg(feature = "compio")]
217pub use server_compio::serve_with_shutdown;
218#[cfg(feature = "compio")]
219pub use server_compio::serve_with_shutdown_and_config;
220
221/// TLS/SSL server implementation for secure connections.
222#[cfg(all(not(feature = "compio-tls"), feature = "tls"))]
223#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
224pub mod server_tls;
225#[cfg(all(not(feature = "compio"), feature = "tls"))]
226#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
227pub use server_tls::serve_tls;
228#[cfg(all(not(feature = "compio"), feature = "tls"))]
229#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
230pub use server_tls::serve_tls_with_config;
231#[cfg(all(not(feature = "compio"), feature = "tls"))]
232#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
233pub use server_tls::serve_tls_with_shutdown;
234#[cfg(all(not(feature = "compio"), feature = "tls"))]
235#[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
236pub use server_tls::serve_tls_with_shutdown_and_config;
237
238#[cfg(feature = "compio-tls")]
239#[cfg_attr(docsrs, doc(cfg(feature = "compio-tls")))]
240pub mod server_tls_compio;
241#[cfg(feature = "compio-tls")]
242pub use server_tls_compio::serve_tls;
243#[cfg(feature = "compio-tls")]
244pub use server_tls_compio::serve_tls_with_config;
245#[cfg(feature = "compio-tls")]
246pub use server_tls_compio::serve_tls_with_shutdown;
247#[cfg(feature = "compio-tls")]
248pub use server_tls_compio::serve_tls_with_shutdown_and_config;
249
250/// HTTP/3 server implementation using QUIC transport.
251#[cfg(all(feature = "http3", not(feature = "compio")))]
252#[cfg_attr(docsrs, doc(cfg(feature = "http3")))]
253pub mod server_h3;
254#[cfg(all(feature = "http3", not(feature = "compio")))]
255#[cfg_attr(docsrs, doc(cfg(feature = "http3")))]
256pub use server_h3::serve_h3;
257#[cfg(all(feature = "http3", not(feature = "compio")))]
258#[cfg_attr(docsrs, doc(cfg(feature = "http3")))]
259pub use server_h3::serve_h3_with_config;
260#[cfg(all(feature = "http3", not(feature = "compio")))]
261#[cfg_attr(docsrs, doc(cfg(feature = "http3")))]
262pub use server_h3::serve_h3_with_shutdown;
263#[cfg(all(feature = "http3", not(feature = "compio")))]
264#[cfg_attr(docsrs, doc(cfg(feature = "http3")))]
265pub use server_h3::serve_h3_with_shutdown_and_config;
266
267/// Raw TCP server for handling arbitrary TCP connections.
268pub mod server_tcp;
269
270/// HTTP/2 cleartext (h2c, prior knowledge) server.
271#[cfg(all(feature = "http2", not(feature = "compio")))]
272#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
273pub mod server_h2c;
274#[cfg(all(feature = "http2", not(feature = "compio")))]
275pub use server_h2c::serve_h2c;
276#[cfg(all(feature = "http2", not(feature = "compio")))]
277pub use server_h2c::serve_h2c_with_config;
278#[cfg(all(feature = "http2", not(feature = "compio")))]
279pub use server_h2c::serve_h2c_with_shutdown;
280#[cfg(all(feature = "http2", not(feature = "compio")))]
281pub use server_h2c::serve_h2c_with_shutdown_and_config;
282
283/// UDP datagram server for handling raw UDP packets.
284pub mod server_udp;
285
286/// Unix Domain Socket server for local IPC and reverse proxy communication.
287#[cfg(all(unix, not(feature = "compio")))]
288pub mod server_unix;
289
290/// PROXY protocol v1/v2 parser for load balancer integration.
291#[cfg(not(feature = "compio"))]
292pub mod proxy_protocol;
293
294/// systemd / s6 / catflap socket activation helpers (LISTEN_FDS).
295#[cfg(feature = "socket-activation")]
296#[cfg_attr(docsrs, doc(cfg(feature = "socket-activation")))]
297pub mod socket_activation;
298
299/// Linux vsock transport for VM-host bridges.
300#[cfg(all(target_os = "linux", feature = "vsock", not(feature = "compio")))]
301#[cfg_attr(docsrs, doc(cfg(feature = "vsock")))]
302pub mod server_vsock;
303
304/// Bind a TCP listener for `addr`, asking interactively to increment the port
305/// if it is already in use.
306///
307/// This helper is primarily intended for local development and example binaries.
308#[cfg(not(feature = "compio"))]
309pub async fn bind_with_port_fallback(addr: &str) -> io::Result<tokio::net::TcpListener> {
310  let mut socket_addr =
311    SocketAddr::from_str(addr).map_err(|e| io::Error::new(ErrorKind::InvalidInput, e))?;
312  let start_port = socket_addr.port();
313
314  loop {
315    let addr_str = socket_addr.to_string();
316    match tokio::net::TcpListener::bind(&addr_str).await {
317      Ok(listener) => {
318        if socket_addr.port() != start_port {
319          println!(
320            "Port {} was in use, starting on {} instead",
321            start_port,
322            socket_addr.port()
323          );
324        }
325        return Ok(listener);
326      }
327      Err(err) if err.kind() == ErrorKind::AddrInUse => {
328        let curr_port = socket_addr.port();
329        // Cap at u16::MAX — `saturating_add(1)` on 65535 returns 65535, so
330        // a naive loop would re-bind the same port forever if the user keeps
331        // answering "Y". Surface the original AddrInUse error instead.
332        if curr_port == u16::MAX {
333          return Err(err);
334        }
335        let next_port = curr_port + 1;
336        // Synchronous stdin read on a blocking pool — the previous call
337        // ran the read inline and blocked the async runtime worker until
338        // the user typed Enter.
339        let proceed =
340          tokio::task::spawn_blocking(move || ask_to_use_next_port(curr_port, next_port))
341            .await
342            .map_err(io::Error::other)??;
343        if !proceed {
344          return Err(err);
345        }
346        socket_addr.set_port(next_port);
347      }
348      Err(err) => return Err(err),
349    }
350  }
351}
352
353/// Bind a TCP listener for `addr`, asking interactively to increment the port
354/// if it is already in use (compio version).
355#[cfg(feature = "compio")]
356pub async fn bind_with_port_fallback(addr: &str) -> io::Result<compio::net::TcpListener> {
357  let mut socket_addr =
358    SocketAddr::from_str(addr).map_err(|e| io::Error::new(ErrorKind::InvalidInput, e))?;
359  let start_port = socket_addr.port();
360
361  loop {
362    let addr_str = socket_addr.to_string();
363    match compio::net::TcpListener::bind(&addr_str).await {
364      Ok(listener) => {
365        if socket_addr.port() != start_port {
366          println!(
367            "Port {} was in use, starting on {} instead",
368            start_port,
369            socket_addr.port()
370          );
371        }
372        return Ok(listener);
373      }
374      Err(err) if err.kind() == ErrorKind::AddrInUse => {
375        let curr_port = socket_addr.port();
376        // See the tokio variant: avoid infinite loop on port 65535.
377        if curr_port == u16::MAX {
378          return Err(err);
379        }
380        let next_port = curr_port + 1;
381        // compio variant: dedicate a blocking-pool task for the stdin read
382        // so the io_uring/IOCP reactor isn't held by the prompt.
383        let proceed =
384          compio::runtime::spawn_blocking(move || ask_to_use_next_port(curr_port, next_port))
385            .await
386            .map_err(|_| io::Error::other("compio spawn_blocking panicked"))??;
387        if !proceed {
388          return Err(err);
389        }
390        socket_addr.set_port(next_port);
391      }
392      Err(err) => return Err(err),
393    }
394  }
395}
396
397fn ask_to_use_next_port(current: u16, next: u16) -> io::Result<bool> {
398  loop {
399    print!("Port {current} is already in use. Start on {next} instead? [Y/n]: ");
400    io::stdout().flush()?;
401
402    let mut input = String::new();
403    io::stdin().read_line(&mut input)?;
404    let trimmed = input.trim();
405
406    if trimmed.is_empty()
407      || trimmed.eq_ignore_ascii_case("y")
408      || trimmed.eq_ignore_ascii_case("yes")
409    {
410      return Ok(true);
411    }
412
413    if trimmed.eq_ignore_ascii_case("n") || trimmed.eq_ignore_ascii_case("no") {
414      return Ok(false);
415    }
416
417    println!("Please answer 'y' or 'n'.");
418  }
419}