Skip to main content

tako_rs_server_pt/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! Thread-per-core HTTP server bootstrap for the Tako framework.
4//!
5//! Spawns N OS threads (one per CPU by default), each running its own
6//! `tokio` `current_thread` runtime + [`tokio::task::LocalSet`]. Connections
7//! are distributed across workers at the kernel level via `SO_REUSEPORT`.
8//! Tasks never migrate between threads, eliminating tokio's work-stealing
9//! coordination on the hot path and improving cache locality (especially with
10//! the `affinity` feature which pins each worker to a specific core).
11//!
12//! Two entry points:
13//!
14//! - [`serve_per_thread`] — uses the existing thread-safe [`tako_rs_core::router::Router`]
15//!   from `tako-core`. Drop-in alternative to `tako::serve`; no API changes.
16//! - `serve_per_thread_compio` (under the `compio` feature) — same `SO_REUSEPORT`
17//!   bootstrap but each worker runs a `compio` runtime (`io_uring` on Linux,
18//!   IOCP on Windows, kqueue on macOS).
19
20use std::convert::Infallible;
21use std::io;
22use std::net::SocketAddr;
23use std::str::FromStr;
24use std::time::Duration;
25
26use hyper::server::conn::http1;
27use hyper::service::service_fn;
28use socket2::Domain;
29use socket2::Protocol;
30use socket2::Socket;
31use socket2::Type;
32use tako_rs_core::body::TakoBody;
33use tako_rs_core::conn_info::ConnInfo;
34use tako_rs_core::router::Router;
35use tokio::net::TcpListener;
36use tokio::runtime::Builder;
37use tokio::task::LocalSet;
38
39/// Configuration for [`serve_per_thread`] (and the `compio` variant when enabled).
40#[derive(Debug, Clone)]
41pub struct PerThreadConfig {
42  /// Number of worker threads. Defaults to the number of logical CPUs.
43  pub workers: usize,
44  /// Pin each worker to a CPU core (requires the `affinity` feature).
45  pub pin_to_core: bool,
46  /// `SO_REUSEPORT` listen backlog.
47  pub backlog: i32,
48  /// Maximum time the coordinator waits for in-flight requests after shutdown.
49  /// Workers are dropped after this elapses.
50  pub drain_timeout: Duration,
51}
52
53impl Default for PerThreadConfig {
54  fn default() -> Self {
55    Self {
56      workers: num_cpus(),
57      pin_to_core: cfg!(feature = "affinity"),
58      backlog: 1024,
59      drain_timeout: Duration::from_secs(30),
60    }
61  }
62}
63
64/// Shared bind-result tracker used by the [`PerThreadShutdown`] coordinator
65/// so the parent process can detect "all worker threads failed to bind"
66/// (e.g. `SO_REUSEPORT` unavailable on Windows / non-Linux Unix, port already
67/// taken) and surface a real error from [`serve_per_thread`] instead of
68/// silently waiting on Ctrl+C forever and then returning `Ok(())`.
69#[derive(Default)]
70struct BindStatus {
71  /// Number of workers that completed their bind step successfully.
72  succeeded: std::sync::atomic::AtomicUsize,
73  /// Number of workers that failed their bind step.
74  failed: std::sync::atomic::AtomicUsize,
75  /// First recorded bind error so the parent can return something
76  /// actionable to its caller / supervisor. Plain `std::sync::Mutex` is
77  /// fine here — this is a cold path (one write per worker at startup,
78  /// one read on shutdown).
79  first_err: std::sync::Mutex<Option<io::Error>>,
80  /// Wake-up notify so the parent does not have to poll.
81  notify: tokio::sync::Notify,
82}
83
84/// Shutdown coordinator shared by every worker spawned via [`spawn_per_thread`]
85/// (and friends). Workers `select!` against [`Self::notified`] in their accept
86/// loop, so triggering [`PerThreadShutdown::trigger`] cleanly exits each
87/// worker's `loop { accept }` instead of leaking the OS thread on shutdown.
88///
89/// Backed by a [`tokio_util::sync::CancellationToken`] so the signal is
90/// sticky: workers that register `notified()` after `trigger()` was called
91/// still observe the request immediately, fixing the `Notify::notify_waiters`
92/// race where late subscribers would miss the shutdown.
93///
94/// Also carries a private [`BindStatus`] that workers update with the result
95/// of their `SO_REUSEPORT` bind so the parent (e.g. [`serve_per_thread`]) can
96/// fail loudly on "every worker failed to bind" instead of returning Ok(()) —
97/// previously the function would await Ctrl+C indefinitely and then claim
98/// success even when no listener was up, a false health signal to supervisors.
99#[derive(Clone, Default)]
100pub struct PerThreadShutdown {
101  inner: tokio_util::sync::CancellationToken,
102  bind_status: std::sync::Arc<BindStatus>,
103}
104
105impl PerThreadShutdown {
106  /// Construct an unsignalled shutdown coordinator.
107  #[must_use]
108  pub fn new() -> Self {
109    Self::default()
110  }
111
112  /// Notify every worker waiter that it should exit its accept loop.
113  /// Idempotent — calling it more than once is a no-op.
114  pub fn trigger(&self) {
115    self.inner.cancel();
116  }
117
118  /// Future a worker awaits to learn that shutdown has been requested.
119  pub async fn notified(&self) {
120    self.inner.cancelled().await;
121  }
122
123  /// Worker hook: report a successful `SO_REUSEPORT` bind.
124  pub(crate) fn report_bind_success(&self) {
125    self
126      .bind_status
127      .succeeded
128      .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
129    self.bind_status.notify.notify_waiters();
130  }
131
132  /// Worker hook: report a bind failure (with the underlying `io::Error`).
133  /// The first error wins for reporting; later errors are dropped after their
134  /// `tracing::error!` log.
135  pub(crate) fn report_bind_failure(&self, err: io::Error) {
136    {
137      let mut guard = self.bind_status.first_err.lock().unwrap();
138      if guard.is_none() {
139        *guard = Some(err);
140      }
141    }
142    self
143      .bind_status
144      .failed
145      .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
146    self.bind_status.notify.notify_waiters();
147  }
148
149  /// Wait until either at least one worker bound successfully, or all
150  /// `total` workers reported a bind failure. Returns the first recorded
151  /// `io::Error` in the all-failure case so the caller can propagate a real
152  /// error instead of pretending the server started.
153  pub async fn wait_for_bind_outcome(&self, total: usize) -> io::Result<()> {
154    use std::sync::atomic::Ordering;
155
156    loop {
157      // Arm the notified future BEFORE reading state so a wake fired
158      // between the load and the await is not lost.
159      let notified = self.bind_status.notify.notified();
160      tokio::pin!(notified);
161      notified.as_mut().enable();
162
163      let succ = self.bind_status.succeeded.load(Ordering::SeqCst);
164      let fail = self.bind_status.failed.load(Ordering::SeqCst);
165
166      if succ > 0 {
167        return Ok(());
168      }
169      if succ + fail >= total {
170        let err = self
171          .bind_status
172          .first_err
173          .lock()
174          .unwrap()
175          .take()
176          .unwrap_or_else(|| {
177            io::Error::other(format!("all {total} per-thread workers failed to bind"))
178          });
179        return Err(err);
180      }
181
182      notified.await;
183    }
184  }
185}
186
187fn num_cpus() -> usize {
188  std::thread::available_parallelism().map_or(1, std::num::NonZero::get)
189}
190
191#[cfg(feature = "compio")]
192fn compio_accept_backoff() -> Duration {
193  Duration::from_millis(5)
194}
195
196/// One-shot platform-capability warning. `SO_REUSEPORT` behaves like
197/// kernel-level load balancing only on Linux; macOS / *BSD ignore the load-
198/// balance semantic (last-binder-wins), and Windows lacks the option entirely.
199fn warn_reuseport_platform_once() {
200  static WARNED: std::sync::Once = std::sync::Once::new();
201  WARNED.call_once(|| {
202    #[cfg(target_os = "linux")]
203    {
204      // No-op: SO_REUSEPORT is the supported configuration.
205    }
206    #[cfg(all(unix, not(target_os = "linux")))]
207    {
208      tracing::warn!(
209        "tako-server-pt: SO_REUSEPORT is being used on a non-Linux Unix \
210         platform. The kernel typically sends incoming connections only to \
211         the most recent binder, so multi-worker thread-per-core mode will \
212         not load-balance correctly. Use a single worker or run on Linux."
213      );
214    }
215    #[cfg(windows)]
216    {
217      tracing::warn!(
218        "tako-server-pt: SO_REUSEPORT does not exist on Windows. Only the \
219         first worker will accept connections; subsequent worker binds will \
220         fail with EADDRINUSE. Use a single worker on Windows."
221      );
222    }
223  });
224}
225
226fn bind_reuseport_std(addr: SocketAddr, backlog: i32) -> io::Result<std::net::TcpListener> {
227  warn_reuseport_platform_once();
228  let domain = if addr.is_ipv4() {
229    Domain::IPV4
230  } else {
231    Domain::IPV6
232  };
233  let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
234  socket.set_reuse_address(true)?;
235  // `socket2::set_reuse_port` is gated to Unix targets only; on Linux it's a
236  // genuine kernel load-balancer, on macOS / BSD it's a no-op-equivalent
237  // (last-binder-wins), on Windows the underlying SO_REUSEPORT does not
238  // exist so the call is omitted entirely.
239  #[cfg(unix)]
240  socket.set_reuse_port(true)?;
241  socket.set_nonblocking(true)?;
242  socket.bind(&addr.into())?;
243  socket.listen(backlog)?;
244  Ok(socket.into())
245}
246
247fn bind_reuseport(addr: SocketAddr, backlog: i32) -> io::Result<TcpListener> {
248  TcpListener::from_std(bind_reuseport_std(addr, backlog)?)
249}
250
251#[cfg(feature = "compio")]
252fn bind_reuseport_compio(addr: SocketAddr, backlog: i32) -> io::Result<compio::net::TcpListener> {
253  compio::net::TcpListener::from_std(bind_reuseport_std(addr, backlog)?)
254}
255
256/// Starts a thread-per-core HTTP server with the given router.
257///
258/// Spawns `cfg.workers` OS threads. Each worker binds its own `SO_REUSEPORT`
259/// socket on `addr`, builds a single-threaded tokio runtime, and serves
260/// connections via [`tokio::task::spawn_local`].
261///
262/// This blocks the calling thread until all workers exit. To control shutdown
263/// externally use [`spawn_per_thread`] which returns a [`PerThreadShutdown`]
264/// handle.
265pub fn serve_per_thread(addr: &str, router: Router, cfg: PerThreadConfig) -> io::Result<()> {
266  let workers = cfg.workers;
267  let (handle, shutdown) = spawn_per_thread(addr, router, cfg)?;
268  // Wait for SIGINT (Ctrl+C) on a dedicated mini-runtime and then trigger
269  // graceful shutdown. The earlier `drop(shutdown)` was a no-op — dropping
270  // one clone of the `CancellationToken` does not cancel anything; only
271  // `trigger()` does. Without this, the function would never return on a
272  // healthy process.
273  let rt = tokio::runtime::Builder::new_current_thread()
274    .enable_all()
275    .build()
276    .map_err(|e| io::Error::other(format!("ctrl-c runtime: {e}")))?;
277  // Block on bind-outcome first: if every worker failed to bind
278  // (SO_REUSEPORT unavailable, port already taken, …) we surface the first
279  // recorded `io::Error` instead of pretending the server is up and waiting
280  // forever on Ctrl+C. If at least one worker bound successfully, proceed
281  // to the Ctrl+C wait as usual.
282  let result: io::Result<()> = rt.block_on(async {
283    shutdown.wait_for_bind_outcome(workers).await?;
284    let _ = tokio::signal::ctrl_c().await;
285    Ok(())
286  });
287  shutdown.trigger();
288  for h in handle {
289    let _ = h.join();
290  }
291  result
292}
293
294/// Spawn the worker threads and return both the join handles and a
295/// [`PerThreadShutdown`] that the caller can use to signal a clean stop.
296///
297/// The returned thread handles are owned by the caller; dropping them does not
298/// stop the server. Trigger the shutdown via [`PerThreadShutdown::trigger`],
299/// then `join` each handle (or just drop them after the trigger if you're OK
300/// with detached cleanup).
301pub fn spawn_per_thread(
302  addr: &str,
303  router: Router,
304  cfg: PerThreadConfig,
305) -> io::Result<(Vec<std::thread::JoinHandle<()>>, PerThreadShutdown)> {
306  let socket_addr =
307    SocketAddr::from_str(addr).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
308
309  // Leak the router so workers share a `&'static` reference — no Arc clones
310  // on the per-connection or per-request hot path.
311  let router: &'static Router = Box::leak(Box::new(router));
312
313  let shutdown = PerThreadShutdown::new();
314  let mut handles = Vec::with_capacity(cfg.workers);
315  for worker_id in 0..cfg.workers {
316    let cfg = cfg.clone();
317    let shutdown = shutdown.clone();
318    let h = std::thread::Builder::new()
319      .name(format!("tako-pt-{worker_id}"))
320      .spawn(move || worker_main(worker_id, socket_addr, router, cfg, shutdown))
321      .expect("spawn tako-pt worker");
322    handles.push(h);
323  }
324  Ok((handles, shutdown))
325}
326
327// Without the `affinity` feature, `worker_id` and `cfg.pin_to_core` aren't
328// read past this point; mark the function tolerant of those unused names so
329// we don't need the awkward `let _ = (worker_id, &cfg.pin_to_core);` trick
330// that previously sat inside the function body for the sole purpose of
331// silencing the warning.
332#[cfg_attr(not(feature = "affinity"), allow(unused_variables))]
333fn worker_main(
334  worker_id: usize,
335  addr: SocketAddr,
336  router: &'static Router,
337  cfg: PerThreadConfig,
338  shutdown: PerThreadShutdown,
339) {
340  #[cfg(feature = "affinity")]
341  if cfg.pin_to_core {
342    if let Some(ids) = core_affinity::get_core_ids() {
343      if let Some(id) = ids.get(worker_id) {
344        if !core_affinity::set_for_current(*id) {
345          tracing::warn!(
346            worker_id,
347            "pin_to_core: core_affinity::set_for_current returned false; running without affinity"
348          );
349        }
350      } else {
351        tracing::warn!(
352          worker_id,
353          available_cores = ids.len(),
354          "pin_to_core: worker_id exceeds available cores; running without affinity"
355        );
356      }
357    } else {
358      tracing::warn!(
359        worker_id,
360        "pin_to_core: core_affinity::get_core_ids() returned None; running without affinity"
361      );
362    }
363  }
364
365  let rt = match Builder::new_current_thread().enable_all().build() {
366    Ok(rt) => rt,
367    Err(e) => {
368      tracing::error!("worker {worker_id}: failed to build runtime: {e}");
369      // Treat runtime-build failure as a bind failure so the parent is
370      // unblocked from `wait_for_bind_outcome` instead of waiting on
371      // Ctrl+C for a worker that never reached its bind step.
372      shutdown.report_bind_failure(io::Error::other(format!(
373        "worker {worker_id}: failed to build runtime: {e}"
374      )));
375      return;
376    }
377  };
378
379  let local = LocalSet::new();
380  local.block_on(&rt, async move {
381    let listener = match bind_reuseport(addr, cfg.backlog) {
382      Ok(l) => {
383        // Report success so `serve_per_thread` can stop blocking on
384        // `wait_for_bind_outcome` and proceed to the Ctrl+C wait.
385        shutdown.report_bind_success();
386        l
387      }
388      Err(e) => {
389        tracing::error!("worker {worker_id}: bind failed: {e}");
390        shutdown.report_bind_failure(e);
391        return;
392      }
393    };
394    tracing::debug!("tako-pt worker {worker_id} listening on {addr}");
395
396    let shutdown_fut = shutdown.notified();
397    tokio::pin!(shutdown_fut);
398
399    // SRV-07: track per-connection tasks in a `JoinSet` instead of a `Vec`
400    // so completed connections can be reaped lazily. The previous `Vec`
401    // grew unboundedly across a worker's lifetime (a million-connection
402    // day = a million `JoinHandle`s held forever — soft leak proportional
403    // to total connections handled, even though the underlying tasks were
404    // long done).
405    let mut connection_handles: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
406
407    loop {
408      tokio::select! {
409        accept = listener.accept() => {
410          let (stream, peer) = match accept {
411            Ok(v) => v,
412            Err(e) => {
413              tracing::warn!("worker {worker_id}: accept failed: {e}");
414              continue;
415            }
416          };
417          // `set_nodelay` only fails for already-closed sockets (peer hung
418          // up between accept and here) or on platforms without TCP_NODELAY
419          // — either way the connection still works, but log at debug so an
420          // operator can investigate persistent failures.
421          if let Err(e) = stream.set_nodelay(true) {
422            tracing::debug!("worker {worker_id}: set_nodelay failed for {peer}: {e}");
423          }
424          let io = hyper_util::rt::TokioIo::new(stream);
425
426          connection_handles.spawn_local(async move {
427            let svc = service_fn(move |mut req| async move {
428              req.extensions_mut().insert(peer);
429              req.extensions_mut().insert(ConnInfo::tcp(peer));
430              let resp = router.dispatch(req.map(TakoBody::incoming)).await;
431              Ok::<_, Infallible>(resp)
432            });
433
434            let mut http = http1::Builder::new();
435            http.keep_alive(true);
436            http.pipeline_flush(true);
437            if let Err(err) = http.serve_connection(io, svc).with_upgrades().await {
438              if err.is_incomplete_message() {
439                tracing::debug!("worker {worker_id}: client disconnected mid-message: {err}");
440              } else {
441                tracing::error!("worker {worker_id}: connection error: {err}");
442              }
443            }
444          });
445
446          // Reap finished connection tasks opportunistically so the JoinSet
447          // does not retain `AbortHandle`s for already-completed tasks. Each
448          // `try_join_next` is non-blocking; the loop drains all currently
449          // finished entries.
450          while connection_handles.try_join_next().is_some() {}
451        }
452        () = &mut shutdown_fut => {
453          tracing::info!("worker {worker_id}: shutdown signalled, draining");
454          break;
455        }
456      }
457    }
458    // Real graceful drain: wait on every in-flight connection task up to
459    // `drain_timeout`. `join_next()` yields one task at a time as it
460    // finishes; the timeout wraps the whole drain so a single hung
461    // connection cannot stall shutdown forever.
462    let drain = tokio::time::timeout(cfg.drain_timeout, async {
463      while connection_handles.join_next().await.is_some() {}
464    });
465    let _ = drain.await;
466  });
467}
468
469/// Starts a thread-per-core HTTP server with the compio runtime.
470///
471/// Same `SO_REUSEPORT` bootstrap as [`serve_per_thread`] but each worker runs a
472/// single-threaded `compio` runtime — `io_uring` on Linux, IOCP on Windows,
473/// kqueue on macOS. The router type stays the standard thread-safe
474/// [`tako_rs_core::router::Router`].
475#[cfg(feature = "compio")]
476#[cfg_attr(docsrs, doc(cfg(feature = "compio")))]
477pub fn serve_per_thread_compio(addr: &str, router: Router, cfg: PerThreadConfig) -> io::Result<()> {
478  let socket_addr =
479    SocketAddr::from_str(addr).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
480
481  let router: &'static Router = Box::leak(Box::new(router));
482
483  let workers = cfg.workers;
484  let shutdown = PerThreadShutdown::new();
485  let mut handles = Vec::with_capacity(cfg.workers);
486  for worker_id in 0..cfg.workers {
487    let cfg = cfg.clone();
488    let shutdown = shutdown.clone();
489    let h = std::thread::Builder::new()
490      .name(format!("tako-pt-compio-{worker_id}"))
491      .spawn(move || worker_main_compio(worker_id, socket_addr, router, cfg, shutdown))
492      .expect("spawn tako-pt-compio worker");
493    handles.push(h);
494  }
495
496  // Same Ctrl+C / shutdown discipline as `serve_per_thread`, plus the same
497  // bind-outcome wait so an all-bind-fail does not silently look healthy.
498  let rt = tokio::runtime::Builder::new_current_thread()
499    .enable_all()
500    .build()
501    .map_err(|e| io::Error::other(format!("ctrl-c runtime: {e}")))?;
502  let result: io::Result<()> = rt.block_on(async {
503    shutdown.wait_for_bind_outcome(workers).await?;
504    let _ = tokio::signal::ctrl_c().await;
505    Ok(())
506  });
507  shutdown.trigger();
508  for h in handles {
509    let _ = h.join();
510  }
511  result
512}
513
514/// RAII counter-decrementer used by the compio worker to track in-flight
515/// connections.
516///
517/// `Drop` always runs — normal completion, panic unwind, runtime shutdown —
518/// so the inflight count cannot leak. Mirrors the `ConnectionGuard` pattern
519/// in `tako-server`'s `server_compio.rs`. Without this the compio per-thread
520/// worker had no way to wait for in-flight work at shutdown: it spawned and
521/// detached connection tasks, and `cfg.drain_timeout` was silently ignored
522/// — every active request was abort-killed the moment `block_on` returned.
523#[cfg(feature = "compio")]
524struct PtConnGuard {
525  inflight: std::sync::Arc<std::sync::atomic::AtomicUsize>,
526  drain_notify: std::sync::Arc<tokio::sync::Notify>,
527}
528
529#[cfg(feature = "compio")]
530impl PtConnGuard {
531  fn new(
532    inflight: std::sync::Arc<std::sync::atomic::AtomicUsize>,
533    drain_notify: std::sync::Arc<tokio::sync::Notify>,
534  ) -> Self {
535    inflight.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
536    Self {
537      inflight,
538      drain_notify,
539    }
540  }
541}
542
543#[cfg(feature = "compio")]
544impl Drop for PtConnGuard {
545  fn drop(&mut self) {
546    self
547      .inflight
548      .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
549    self.drain_notify.notify_waiters();
550  }
551}
552
553#[cfg(feature = "compio")]
554#[cfg_attr(not(feature = "affinity"), allow(unused_variables))]
555fn worker_main_compio(
556  worker_id: usize,
557  addr: SocketAddr,
558  router: &'static Router,
559  cfg: PerThreadConfig,
560  shutdown: PerThreadShutdown,
561) {
562  use std::sync::Arc;
563  use std::sync::atomic::AtomicUsize;
564  use std::sync::atomic::Ordering;
565
566  use cyper_core::HyperStream;
567  use tokio::sync::Notify;
568
569  #[cfg(feature = "affinity")]
570  if cfg.pin_to_core {
571    if let Some(ids) = core_affinity::get_core_ids() {
572      if let Some(id) = ids.get(worker_id) {
573        if !core_affinity::set_for_current(*id) {
574          tracing::warn!(
575            worker_id,
576            "pin_to_core: core_affinity::set_for_current returned false; running without affinity"
577          );
578        }
579      } else {
580        tracing::warn!(
581          worker_id,
582          available_cores = ids.len(),
583          "pin_to_core: worker_id exceeds available cores; running without affinity"
584        );
585      }
586    } else {
587      tracing::warn!(
588        worker_id,
589        "pin_to_core: core_affinity::get_core_ids() returned None; running without affinity"
590      );
591    }
592  }
593
594  let rt = match compio::runtime::RuntimeBuilder::new().build() {
595    Ok(rt) => rt,
596    Err(e) => {
597      tracing::error!("worker {worker_id}: failed to build compio runtime: {e}");
598      // Unblock the parent's `wait_for_bind_outcome` on runtime-build
599      // failure too (worker never reaches its bind step otherwise).
600      shutdown.report_bind_failure(io::Error::other(format!(
601        "worker {worker_id}: failed to build compio runtime: {e}"
602      )));
603      return;
604    }
605  };
606
607  rt.block_on(async move {
608    let listener = match bind_reuseport_compio(addr, cfg.backlog) {
609      Ok(l) => {
610        shutdown.report_bind_success();
611        l
612      }
613      Err(e) => {
614        tracing::error!("worker {worker_id}: bind failed: {e}");
615        shutdown.report_bind_failure(e);
616        return;
617      }
618    };
619    tracing::debug!("tako-pt-compio worker {worker_id} listening on {addr}");
620
621    let cancel = shutdown.inner.clone();
622    let mut backoff = compio_accept_backoff();
623    let inflight = Arc::new(AtomicUsize::new(0));
624    let drain_notify = Arc::new(Notify::new());
625
626    loop {
627      let accept_fut = listener.accept();
628      let cancel_fut = cancel.cancelled();
629      tokio::pin!(accept_fut, cancel_fut);
630      let accept = futures_util::future::select(accept_fut, cancel_fut).await;
631      let (stream, peer) = match accept {
632        futures_util::future::Either::Left((Ok(v), _)) => {
633          backoff = compio_accept_backoff();
634          v
635        }
636        futures_util::future::Either::Left((Err(e), _)) => {
637          let delay = backoff;
638          tracing::warn!("worker {worker_id}: accept failed: {e}; backing off {delay:?}");
639          compio::time::sleep(delay).await;
640          backoff = std::cmp::min(backoff * 2, Duration::from_secs(1));
641          continue;
642        }
643        futures_util::future::Either::Right(_) => {
644          tracing::info!("worker {worker_id}: shutdown signalled, draining");
645          break;
646        }
647      };
648      // Match the tokio variant: disable Nagle so HTTP/1 small writes don't
649      // pay a 40ms RTT penalty on the wire. Mirrors the tokio-pt path at the
650      // top of this file.
651      if let Err(e) = stream.set_nodelay(true) {
652        tracing::debug!("worker {worker_id}: set_nodelay failed for {peer}: {e}");
653      }
654      let io = HyperStream::new(stream);
655      // Build the guard before spawn so the count is incremented on the
656      // current thread (lock-free atomic) instead of racing with the spawn.
657      let guard = PtConnGuard::new(inflight.clone(), drain_notify.clone());
658
659      compio::runtime::spawn(async move {
660        // RAII: dropping `_guard` (on normal completion, panic, or task
661        // cancellation) decrements `inflight` and wakes drain waiters.
662        let _guard = guard;
663        let svc = service_fn(move |mut req| async move {
664          // Match the tokio variant: insert both the raw `SocketAddr`
665          // (legacy lookup key) and the typed `ConnInfo` so extractors that
666          // key off either type observe the same runtime regardless of
667          // whether the build is `compio` or `tokio`. The compio path used
668          // to insert only `peer`, breaking extractors that look up
669          // `ConnInfo` (notably the IP-trust / forwarded-host helpers).
670          req.extensions_mut().insert(peer);
671          req.extensions_mut().insert(ConnInfo::tcp(peer));
672          let resp = router
673            .dispatch(req.map(tako_rs_core::body::TakoBody::new))
674            .await;
675          Ok::<_, Infallible>(resp)
676        });
677
678        let mut http = http1::Builder::new();
679        http.keep_alive(true);
680        if let Err(err) = http.serve_connection(io, svc).with_upgrades().await {
681          if err.is_incomplete_message() {
682            tracing::debug!("worker {worker_id}: client disconnected mid-message: {err}");
683          } else {
684            tracing::error!("worker {worker_id}: connection error: {err}");
685          }
686        }
687      })
688      .detach();
689    }
690
691    // Drain phase: wait for in-flight connections to finish, but only up to
692    // `cfg.drain_timeout`. Mirrors the tokio worker (`join_all` + timeout)
693    // and the standalone compio server's `inflight + Notify` loop. Without
694    // this the compio worker silently aborted every active connection on
695    // shutdown — `drain_timeout` was a no-op on the per-thread + compio
696    // build.
697    let drain_deadline = std::time::Instant::now() + cfg.drain_timeout;
698    while inflight.load(Ordering::SeqCst) > 0 {
699      let now = std::time::Instant::now();
700      if now >= drain_deadline {
701        tracing::warn!(
702          worker_id,
703          drain_timeout = ?cfg.drain_timeout,
704          still_inflight = inflight.load(Ordering::SeqCst),
705          "drain timeout exceeded; remaining connections will be aborted"
706        );
707        break;
708      }
709      let remaining = drain_deadline - now;
710      let wait = drain_notify.notified();
711      let sleep = compio::time::sleep(remaining);
712      let wait = std::pin::pin!(wait);
713      let sleep = std::pin::pin!(sleep);
714      if let futures_util::future::Either::Right(_) =
715        futures_util::future::select(wait, sleep).await
716      {
717        tracing::warn!(
718          worker_id,
719          drain_timeout = ?cfg.drain_timeout,
720          still_inflight = inflight.load(Ordering::SeqCst),
721          "drain timeout exceeded; remaining connections will be aborted"
722        );
723        break;
724      }
725    }
726  });
727}