tako-rs-server 2.0.0

Internal server bootstrap for tako-rs. Use the `tako-rs` umbrella crate instead.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
//! Unix Domain Socket server for local IPC and reverse proxy communication.
//!
//! Provides both raw Unix socket and HTTP-over-Unix-socket servers.
//! The HTTP variant is ideal for production deployments behind nginx/`HAProxy`
//! where the app communicates via a local socket file instead of TCP.
//!
//! Filesystem and Linux abstract-namespace paths are both supported. A path
//! whose string representation starts with `@` is interpreted as a Linux
//! abstract socket: e.g. `@tako.sock` binds to the abstract name `tako.sock`
//! (NUL-prefixed in the kernel). Abstract sockets do not touch the filesystem,
//! so the stale-socket cleanup and post-shutdown removal are skipped for them.
//!
//! # Examples
//!
//! ## Raw Unix socket (echo server)
//! ```rust,no_run
//! use tako::server_unix::serve_unix;
//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//!
//! # async fn example() -> std::io::Result<()> {
//! serve_unix("/tmp/tako.sock", |mut stream, _addr| {
//!     Box::pin(async move {
//!         let mut buf = vec![0u8; 4096];
//!         let n = stream.read(&mut buf).await?;
//!         stream.write_all(&buf[..n]).await?;
//!         Ok(())
//!     })
//! }).await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## HTTP over Unix socket
//! ```rust,no_run
//! use tako::server_unix::serve_unix_http;
//! use tako::router::Router;
//!
//! # async fn example() -> std::io::Result<()> {
//! let router = Router::new();
//! serve_unix_http("/tmp/tako-http.sock", router).await;
//! # Ok(())
//! # }
//! ```

use std::convert::Infallible;
use std::future::Future;
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use hyper::server::conn::http1;
use hyper::service::service_fn;
use tako_rs_core::body::TakoBody;
use tako_rs_core::conn_info::ConnInfo;
use tako_rs_core::router::Router;
use tako_rs_core::types::BoxError;
use tokio::task::JoinSet;

use crate::ServerConfig;

/// Returns true if `path`'s string form starts with `@`, marking it as a
/// Linux abstract-namespace socket.
#[inline]
fn is_abstract_path(path: &Path) -> bool {
  path.to_str().is_some_and(|s| s.starts_with('@'))
}

/// Bind a `tokio::net::UnixListener` for either a filesystem path or a Linux
/// abstract path (`@`-prefixed). Filesystem paths get the stale-socket
/// cleanup; abstract paths don't.
async fn bind_unix_listener(path: &Path) -> io::Result<tokio::net::UnixListener> {
  if is_abstract_path(path) {
    #[cfg(target_os = "linux")]
    {
      use std::os::linux::net::SocketAddrExt;
      let name = &path.to_str().unwrap().as_bytes()[1..];
      let addr = std::os::unix::net::SocketAddr::from_abstract_name(name)?;
      let std_listener = std::os::unix::net::UnixListener::bind_addr(&addr)?;
      std_listener.set_nonblocking(true)?;
      return tokio::net::UnixListener::from_std(std_listener);
    }
    #[cfg(not(target_os = "linux"))]
    {
      return Err(io::Error::new(
        io::ErrorKind::Unsupported,
        "abstract Unix socket paths (`@`-prefixed) are Linux-only",
      ));
    }
  }
  cleanup_stale_socket(path).await?;
  tokio::net::UnixListener::bind(path)
}

/// Peer address information for Unix domain socket connections.
///
/// Inserted into request extensions for HTTP-over-UDS connections.
/// Handlers can access it via `req.extensions().get::<UnixPeerAddr>()`.
#[derive(Debug, Clone)]
pub struct UnixPeerAddr {
  /// The filesystem path of the peer socket, if available.
  /// Most client connections are unnamed (None).
  pub path: Option<std::path::PathBuf>,
}

/// Starts a raw Unix domain socket server.
///
/// Each accepted connection is dispatched to the handler with the stream
/// and the peer's socket address.
pub async fn serve_unix<F>(path: impl AsRef<Path>, handler: F) -> std::io::Result<()>
where
  F: Fn(
      tokio::net::UnixStream,
      tokio::net::unix::SocketAddr,
    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>>
    + Send
    + Sync
    + 'static,
{
  let path = path.as_ref();
  let listener = bind_unix_listener(path).await?;
  tracing::info!("Unix socket server listening on {}", path.display());

  let handler = Arc::new(handler);

  loop {
    let (stream, addr) = listener.accept().await?;
    let handler = Arc::clone(&handler);

    tokio::spawn(async move {
      if let Err(e) = handler(stream, addr).await {
        tracing::error!("Unix socket connection error: {e}");
      }
    });
  }
}

/// Starts a raw Unix domain socket server with a shutdown signal.
///
/// The server stops accepting new connections when the shutdown signal completes.
/// In-flight connections are drained with a 30 second timeout. Use
/// [`serve_unix_with_shutdown_and_drain`] to override this bound.
pub async fn serve_unix_with_shutdown<F, S>(
  path: impl AsRef<Path>,
  handler: F,
  signal: S,
) -> std::io::Result<()>
where
  F: Fn(
      tokio::net::UnixStream,
      tokio::net::unix::SocketAddr,
    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>>
    + Send
    + Sync
    + 'static,
  S: Future<Output = ()> + Send + 'static,
{
  serve_unix_with_shutdown_and_drain(path, handler, signal, Duration::from_secs(30)).await
}

/// Same as [`serve_unix_with_shutdown`] but with an explicit drain timeout.
pub async fn serve_unix_with_shutdown_and_drain<F, S>(
  path: impl AsRef<Path>,
  handler: F,
  signal: S,
  drain_timeout: Duration,
) -> std::io::Result<()>
where
  F: Fn(
      tokio::net::UnixStream,
      tokio::net::unix::SocketAddr,
    ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send>>
    + Send
    + Sync
    + 'static,
  S: Future<Output = ()> + Send + 'static,
{
  let path = path.as_ref();
  let listener = bind_unix_listener(path).await?;
  tracing::info!("Unix socket server listening on {}", path.display());

  let handler = Arc::new(handler);
  let mut join_set = JoinSet::new();

  tokio::pin!(signal);

  loop {
    tokio::select! {
      result = listener.accept() => {
        let (stream, addr) = result?;
        let handler = Arc::clone(&handler);

        join_set.spawn(async move {
          if let Err(e) = handler(stream, addr).await {
            tracing::error!("Unix socket connection error: {e}");
          }
        });
      }
      () = &mut signal => {
        tracing::info!("Unix socket server shutting down, draining {} connections", join_set.len());
        break;
      }
    }
  }

  let _ = tokio::time::timeout(drain_timeout, async {
    while join_set.join_next().await.is_some() {}
  })
  .await;

  Ok(())
}

/// Starts an HTTP server over a Unix domain socket.
///
/// Ideal for production deployments behind a reverse proxy (nginx, `HAProxy`)
/// where the app communicates via a local socket file instead of TCP.
pub async fn serve_unix_http(path: impl AsRef<Path>, router: Router) {
  if let Err(e) = run_http(
    path.as_ref(),
    router,
    None::<std::future::Pending<()>>,
    ServerConfig::default(),
  )
  .await
  {
    tracing::error!("Unix HTTP server error: {e}");
  }
}

/// Starts an HTTP server over a Unix domain socket with graceful shutdown.
pub async fn serve_unix_http_with_shutdown(
  path: impl AsRef<Path>,
  router: Router,
  signal: impl Future<Output = ()> + Send + 'static,
) {
  if let Err(e) = run_http(path.as_ref(), router, Some(signal), ServerConfig::default()).await {
    tracing::error!("Unix HTTP server error: {e}");
  }
}

/// Like [`serve_unix_http`] with caller-supplied [`ServerConfig`].
pub async fn serve_unix_http_with_config(
  path: impl AsRef<Path>,
  router: Router,
  config: ServerConfig,
) {
  if let Err(e) = run_http(
    path.as_ref(),
    router,
    None::<std::future::Pending<()>>,
    config,
  )
  .await
  {
    tracing::error!("Unix HTTP server error: {e}");
  }
}

/// Like [`serve_unix_http_with_shutdown`] with caller-supplied [`ServerConfig`].
pub async fn serve_unix_http_with_shutdown_and_config(
  path: impl AsRef<Path>,
  router: Router,
  signal: impl Future<Output = ()> + Send + 'static,
  config: ServerConfig,
) {
  if let Err(e) = run_http(path.as_ref(), router, Some(signal), config).await {
    tracing::error!("Unix HTTP server error: {e}");
  }
}

async fn run_http(
  path: &Path,
  router: Router,
  signal: Option<impl Future<Output = ()> + Send + 'static>,
  config: ServerConfig,
) -> Result<(), BoxError> {
  let listener = bind_unix_listener(path).await?;
  let router = Arc::new(router);

  #[cfg(feature = "plugins")]
  router.setup_plugins_once();

  tracing::debug!("Tako Unix HTTP listening on {}", path.display());

  let mut join_set = JoinSet::new();
  let mut accept_backoff = config.accept_backoff;
  let max_conn_semaphore = config
    .max_connections
    .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
  let drain_timeout = config.drain_timeout;
  let header_read_timeout = config.header_read_timeout;
  let keep_alive = config.keep_alive;
  let cancel = tokio_util::sync::CancellationToken::new();
  if let Some(s) = signal {
    let cancel_for_signal = cancel.clone();
    tokio::spawn(async move {
      s.await;
      cancel_for_signal.cancel();
    });
  }

  loop {
    tokio::select! {
      result = listener.accept() => {
        let (stream, addr) = match result {
          Ok(v) => { accept_backoff.reset(); v }
          Err(err) => {
            tracing::warn!("Unix accept failed: {err}; backing off");
            accept_backoff.sleep_and_grow().await;
            continue;
          }
        };
        let permit = if let Some(sem) = &max_conn_semaphore {
          tokio::select! {
            biased;
            () = cancel.cancelled() => break,
            permit = sem.clone().acquire_owned() => match permit {
              Ok(p) => Some(p),
              Err(_) => continue,
            },
          }
        } else {
          None
        };
        let io = hyper_util::rt::TokioIo::new(stream);
        let router = router.clone();

        let peer_addr = UnixPeerAddr {
          path: addr.as_pathname().map(std::path::Path::to_path_buf),
        };

        join_set.spawn(async move {
          let svc = service_fn(move |mut req| {
            let router = router.clone();
            let peer_addr = peer_addr.clone();
            async move {
              let conn_info = ConnInfo::unix(peer_addr.path.clone());
              req.extensions_mut().insert(peer_addr);
              req.extensions_mut().insert(conn_info);
              let response = router.dispatch(req.map(TakoBody::incoming)).await;
              Ok::<_, Infallible>(response)
            }
          });

          let mut http = http1::Builder::new();
          http.keep_alive(keep_alive);
          http.timer(hyper_util::rt::TokioTimer::new());
          if let Some(t) = header_read_timeout {
            http.header_read_timeout(t);
          }
          let conn = http.serve_connection(io, svc).with_upgrades();

          if let Err(err) = conn.await {
            if err.is_incomplete_message() {
              tracing::debug!("client disconnected mid-message on Unix socket: {err}");
            } else {
              tracing::error!("Error serving Unix HTTP connection: {err}");
            }
          }

          drop(permit);
        });
      }
      () = cancel.cancelled() => {
        tracing::info!("Unix HTTP server shutting down...");
        break;
      }
    }
  }

  let drain = tokio::time::timeout(drain_timeout, async {
    while join_set.join_next().await.is_some() {}
  });

  if drain.await.is_err() {
    tracing::warn!(
      "Drain timeout exceeded, aborting {} remaining connections",
      join_set.len()
    );
    join_set.abort_all();
  }

  // Filesystem-backed paths get the socket file removed on shutdown so a
  // subsequent run can re-bind cleanly. Abstract sockets disappear with the
  // last reference, so there's nothing to clean.
  if !is_abstract_path(path) {
    let _ = std::fs::remove_file(path);
  }
  tracing::info!("Unix HTTP server shut down gracefully");
  Ok(())
}

/// Removes a stale socket file if it exists and is not actively in use.
///
/// Probes the socket via the async `tokio::net::UnixStream::connect` so the
/// runtime worker isn't blocked by the previous synchronous `connect()`
/// while another peer's accept queue is draining. A 50ms connect deadline
/// stops a malicious or stuck peer from holding the bind forever.
///
/// **Symlink safety**: `symlink_metadata` + `FileTypeExt::is_socket` make
/// sure the path is itself an `AF_UNIX` socket file before we touch it. If
/// the path turns out to be a regular file, a directory, or a symlink to
/// something else (e.g. `/etc/passwd`), we surface an error instead of
/// blindly `remove_file`-ing it — replacing the socket path with a symlink
/// is otherwise a textbook escalation trap.
///
/// **Concurrency**: a sibling process may remove the same stale file in
/// parallel. The kernel's `bind(2)` is the authoritative race-resolver — it
/// rejects the second binder with `EADDRINUSE`. This helper therefore treats
/// a `remove_file` that races against a sibling (`NotFound`) as success and
/// lets the caller proceed to `bind`. For stronger guarantees use an
/// out-of-band lock (supervisor sequencing, advisory file lock).
async fn cleanup_stale_socket(path: &Path) -> std::io::Result<()> {
  use std::os::unix::fs::FileTypeExt;
  use std::time::Duration;

  let meta = match std::fs::symlink_metadata(path) {
    Ok(m) => m,
    Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
    Err(e) => return Err(e),
  };
  if !meta.file_type().is_socket() {
    return Err(std::io::Error::new(
      std::io::ErrorKind::AlreadyExists,
      format!(
        "{} exists but is not a unix socket; refusing to remove",
        path.display()
      ),
    ));
  }
  let connect = tokio::net::UnixStream::connect(path);
  match tokio::time::timeout(Duration::from_millis(50), connect).await {
    Ok(Ok(_)) => Err(std::io::Error::new(
      std::io::ErrorKind::AddrInUse,
      format!("Unix socket {} is already in use", path.display()),
    )),
    // Connect failed within the deadline (stale socket) or the deadline
    // fired (peer hung mid-handshake) — both mean the path is no longer
    // serving a live process; safe to unlink.
    Ok(Err(_)) | Err(_) => match std::fs::remove_file(path) {
      Ok(()) => Ok(()),
      // Another process removed the same stale file between our metadata
      // probe and the unlink syscall — bind() will resolve the rest.
      Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
      Err(e) => Err(e),
    },
  }
}