aws_smithy_http_server/serve/mod.rs
1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Serve utilities for running HTTP servers.
7//!
8//! This module provides a convenient [`serve`] function similar to `axum::serve`
9//! for easily serving Tower services with Hyper.
10//!
11//! ## When to Use This Module
12//!
13//! - Use [`serve`] when you need a simple, batteries-included HTTP server
14//! - For more control over the Hyper connection builder, use [`.configure_hyper()`](Serve::configure_hyper)
15//! - For Lambda environments, see the `aws-lambda` feature and `routing::lambda_handler`
16//!
17//! ## How It Works
18//!
19//! The `serve` function creates a connection acceptance loop that:
20//!
21//! 1. **Accepts connections** via the [`Listener`] trait (e.g., [`TcpListener`](tokio::net::TcpListener))
22//! 2. **Creates per-connection services** by calling the `make_service` with [`IncomingStream`]
23//! 3. **Converts Tower services to Hyper** using `TowerToHyperService`
24//! 4. **Spawns a task** for each connection to handle HTTP requests
25//!
26//! ```text
27//! ┌─────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────┐
28//! │Listener │─────▶│IncomingStream│─────▶│ make_service │─────▶│ Hyper │
29//! │ accept │ │ (io + addr) │ │ (Tower) │ │ spawn │
30//! └─────────┘ └──────────────┘ └──────────────┘ └────────┘
31//! ```
32//!
33//! The [`IncomingStream`] provides connection metadata to the service factory,
34//! allowing per-connection customization based on remote address or IO type
35//!
36//! ## HTTP Protocol Selection
37//!
38//! By default, `serve` uses HTTP/1 with upgrade support, allowing clients to
39//! negotiate HTTP/2 via the HTTP/1.1 Upgrade mechanism or ALPN. The protocol is
40//! auto-detected for each connection.
41//!
42//! You can customize this behavior with [`.configure_hyper()`](Serve::configure_hyper):
43//!
44//! ```rust,ignore
45//! // Force HTTP/2 only (skips upgrade negotiation)
46//! serve(listener, app.into_make_service())
47//! .configure_hyper(|builder| {
48//! builder.http2_only()
49//! })
50//! .await?;
51//!
52//! // Force HTTP/1 only with keep-alive
53//! serve(listener, app.into_make_service())
54//! .configure_hyper(|builder| {
55//! builder.http1().keep_alive(true)
56//! })
57//! .await?;
58//! ```
59//!
60//! **Performance note**: When using `.http2_only()` or `.http1()`, the server skips
61//! the HTTP/1 upgrade preface reading, which can reduce connection setup latency.
62//!
63//! ## Graceful Shutdown
64//!
65//! Graceful shutdown is zero-cost when not used - no watch channels are allocated
66//! and no `tokio::select!` overhead is incurred. Call
67//! [`.with_graceful_shutdown(signal)`](Serve::with_graceful_shutdown) to enable it:
68//!
69//! ```ignore
70//! serve(listener, service)
71//! .with_graceful_shutdown(async {
72//! tokio::signal::ctrl_c().await.expect("failed to listen for Ctrl+C");
73//! })
74//! .await
75//! ```
76//!
77//! This ensures in-flight requests complete before shutdown. Use
78//! [`.with_shutdown_timeout(duration)`](ServeWithGracefulShutdown::with_shutdown_timeout)
79//! to set a maximum wait time.
80//!
81//! ## Common Patterns
82//!
83//! ### Limiting Concurrent Connections
84//!
85//! Use [`ListenerExt::limit_connections`] to prevent resource exhaustion:
86//!
87//! ```rust,ignore
88//! use aws_smithy_http_server::serve::ListenerExt;
89//!
90//! let listener = TcpListener::bind("0.0.0.0:3000")
91//! .await?
92//! .limit_connections(1000); // Max 1000 concurrent connections
93//!
94//! serve(listener, app.into_make_service()).await?;
95//! ```
96//!
97//! ### Accessing Connection Information
98//!
99//! Use `.into_make_service_with_connect_info::<T>()` to access connection metadata
100//! in your handlers:
101//!
102//! ```rust,ignore
103//! use std::net::SocketAddr;
104//! use aws_smithy_http_server::request::connect_info::ConnectInfo;
105//!
106//! // In your handler:
107//! async fn my_handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
108//! format!("Request from: {}", addr)
109//! }
110//!
111//! // When serving:
112//! serve(
113//! listener,
114//! app.into_make_service_with_connect_info::<SocketAddr>()
115//! ).await?;
116//! ```
117//!
118//! ### Custom TCP Settings
119//!
120//! Use [`ListenerExt::tap_io`] to configure TCP options:
121//!
122//! ```rust,ignore
123//! use aws_smithy_http_server::serve::ListenerExt;
124//!
125//! let listener = TcpListener::bind("0.0.0.0:3000")
126//! .await?
127//! .tap_io(|stream| {
128//! let _ = stream.set_nodelay(true);
129//! });
130//!
131//! serve(listener, app.into_make_service()).await?;
132//! ```
133//!
134//! ## Timeouts and Connection Management
135//!
136//! ### Available Timeout Types
137//!
138//! | Timeout Type | What It Does | How to Configure |
139//! |--------------|--------------|------------------|
140//! | **Header Read** | Time limit for reading HTTP headers | `.configure_hyper()` with `.http1().header_read_timeout()` |
141//! | **Request** | Time limit for processing one request | Tower's `TimeoutLayer` |
142//! | **Connection Duration** | Total connection lifetime limit | Custom accept loop with `tokio::time::timeout` |
143//! | **HTTP/2 Keep-Alive** | Idle timeout between HTTP/2 requests | `.configure_hyper()` with `.http2().keep_alive_*()` |
144//!
145//! **Examples:**
146//! - `examples/header_read_timeout.rs` - Configure header read timeout
147//! - `examples/request_timeout.rs` - Add request-level timeouts
148//! - `examples/custom_accept_loop.rs` - Implement connection duration limits
149//! - `examples/http2_keepalive.rs` - Configure HTTP/2 keep-alive
150//! - `examples/connection_limiting.rs` - Limit concurrent connections
151//! - `examples/request_concurrency_limiting.rs` - Limit concurrent requests
152//!
153//! ### Connection Duration vs Idle Timeout
154//!
155//! **Connection duration timeout**: Closes the connection after N seconds total, regardless of activity.
156//! Implemented with `tokio::time::timeout` wrapping the connection future.
157//!
158//! **Idle timeout**: Closes the connection only when inactive between requests.
159//! - HTTP/2: Available via `.keep_alive_interval()` and `.keep_alive_timeout()`
160//! - HTTP/1.1: Not available without modifying Hyper
161//!
162//! See `examples/custom_accept_loop.rs` for a working connection duration timeout example.
163//!
164//! ### Connection Limiting vs Request Limiting
165//!
166//! **Connection limiting** (`.limit_connections()`): Limits the number of TCP connections.
167//! Use this to prevent socket/file descriptor exhaustion.
168//!
169//! **Request limiting** (`ConcurrencyLimitLayer`): Limits in-flight requests.
170//! Use this to prevent work queue exhaustion. With HTTP/2, one connection can have multiple
171//! requests in flight simultaneously.
172//!
173//! Most applications should use both - they protect different layers.
174//!
175//! ## Troubleshooting
176//!
177//! ### Type Errors
178//!
179//! If you encounter complex error messages about trait bounds, check:
180//!
181//! 1. **Service Error Type**: Your service must have `Error = Infallible`
182//! ```rust,ignore
183//! // ✓ Correct - handlers return responses, not Results
184//! async fn handler() -> Response<Body> { ... }
185//!
186//! // ✗ Wrong - cannot use Result<Response, E>
187//! async fn handler() -> Result<Response<Body>, MyError> { ... }
188//! ```
189//!
190//! 2. **MakeService Wrapper**: Use the correct wrapper for your service:
191//! ```rust,ignore
192//! use aws_smithy_http_server::routing::IntoMakeService;
193//!
194//! // For Smithy services:
195//! app.into_make_service()
196//!
197//! // For services with middleware:
198//! IntoMakeService::new(service)
199//! ```
200//!
201//! ### Graceful Shutdown Not Working
202//!
203//! If graceful shutdown doesn't wait for connections:
204//!
205//! - Ensure you call `.with_graceful_shutdown()` **before** `.await`
206//! - The signal future must be `Send + 'static`
207//! - Consider adding a timeout with `.with_shutdown_timeout()`
208//!
209//! ### Connection Limit Not Applied
210//!
211//! Remember that `.limit_connections()` applies to the listener **before** passing
212//! it to `serve()`:
213//!
214//! ```rust,ignore
215//! // ✓ Correct
216//! let listener = TcpListener::bind("0.0.0.0:3000")
217//! .await?
218//! .limit_connections(100);
219//! serve(listener, app.into_make_service()).await?;
220//!
221//! // ✗ Wrong - limit_connections must be called on listener
222//! serve(TcpListener::bind("0.0.0.0:3000").await?, app.into_make_service())
223//! .limit_connections(100) // This method doesn't exist on Serve
224//! .await?;
225//! ```
226//!
227//! ## Advanced: Custom Connection Handling
228//!
229//! If you need per-connection customization (e.g., different Hyper settings based on
230//! the remote address), you can implement your own connection loop using the building
231//! blocks provided by this module:
232//!
233//! ```rust,ignore
234//! use aws_smithy_http_server::routing::IntoMakeService;
235//! use aws_smithy_http_server::serve::Listener;
236//! use hyper_util::rt::{TokioExecutor, TokioIo};
237//! use hyper_util::server::conn::auto::Builder;
238//! use hyper_util::service::TowerToHyperService;
239//! use tower::ServiceExt;
240//!
241//! let mut listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
242//! let make_service = app.into_make_service_with_connect_info::<SocketAddr>();
243//!
244//! loop {
245//! let (stream, remote_addr) = listener.accept().await?;
246//! let io = TokioIo::new(stream);
247//!
248//! // Per-connection Hyper configuration
249//! let mut builder = Builder::new(TokioExecutor::new());
250//! if remote_addr.ip().is_loopback() {
251//! builder = builder.http2_only(); // Local connections use HTTP/2
252//! } else {
253//! builder = builder.http1().keep_alive(true); // External use HTTP/1
254//! }
255//!
256//! let tower_service = make_service
257//! .ready()
258//! .await?
259//! .call(IncomingStream { io: &io, remote_addr })
260//! .await?;
261//!
262//! let hyper_service = TowerToHyperService::new(tower_service);
263//!
264//! tokio::spawn(async move {
265//! if let Err(err) = builder.serve_connection(io, hyper_service).await {
266//! eprintln!("Error serving connection: {}", err);
267//! }
268//! });
269//! }
270//! ```
271//!
272//! This approach provides complete flexibility while still leveraging the efficient
273//! Hyper and Tower integration provided by this module.
274//!
275//! Portions of the implementation are adapted from axum
276//! (<https://github.com/tokio-rs/axum>), which is licensed under the MIT License.
277//! Copyright (c) 2019 Axum Contributors
278
279use std::convert::Infallible;
280use std::error::Error as StdError;
281use std::fmt::{self, Debug};
282use std::future::{Future, IntoFuture};
283use std::io;
284use std::marker::PhantomData;
285use std::pin::Pin;
286use std::sync::Arc;
287use std::time::Duration;
288
289use http_body::Body as HttpBody;
290use hyper::body::Incoming;
291use hyper_util::rt::{TokioExecutor, TokioIo};
292use hyper_util::server::conn::auto::Builder;
293use hyper_util::service::TowerToHyperService;
294use tower::{Service, ServiceExt as _};
295
296mod listener;
297
298pub use self::listener::{ConnLimiter, ConnLimiterIo, Listener, ListenerExt, TapIo};
299
300// ============================================================================
301// Type Bounds Documentation
302// ============================================================================
303//
304// ## Body Bounds (B)
305// HTTP response bodies must satisfy:
306// - `B: HttpBody + Send + 'static` - Implement the body trait and be sendable
307// - `B::Data: Send` - Data chunks must be sendable across threads
308// - `B::Error: Into<Box<dyn StdError + Send + Sync>>` - Errors must be convertible
309//
310// ## Service Bounds (S)
311//
312// The `S` type parameter represents a **per-connection HTTP service** - a Tower service
313// that handles individual HTTP requests and returns HTTP responses.
314//
315// Required bounds:
316// - `S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible>`
317//
318// This is the core Tower Service trait. It means:
319// * **Input**: Takes an HTTP request with a streaming body (`Incoming` from Hyper)
320// * **Output**: Returns an HTTP response with body type `B`
321// * **Error**: Must be `Infallible`, meaning the service never returns errors at the
322// Tower level. Any application errors must be converted into HTTP responses
323// (e.g., 500 Internal Server Error) before reaching this layer.
324//
325// - `S: Clone + Send + 'static`
326// * **Clone**: Each HTTP/1.1 or HTTP/2 connection may handle multiple requests
327// sequentially or concurrently. The service must be cloneable so each request
328// can get its own copy.
329// * **Send**: The service will be moved into a spawned Tokio task, so it must be
330// safe to send across thread boundaries.
331// * **'static**: No borrowed references - the service must own all its data since
332// it will outlive the connection setup phase.
333//
334// - `S::Future: Send`
335// The future returned by `Service::call()` must also be `Send` so it can be
336// polled from any thread in Tokio's thread pool.
337//
338// ## MakeService Bounds (M)
339//
340// The `M` type parameter represents a **service factory** - a Tower service that
341// creates a new `S` service for each incoming connection. This allows customizing
342// services based on connection metadata (remote address, TLS info, etc.).
343//
344// Connection Info → Service Factory → Per-Connection Service
345//
346// Required bounds:
347// - `M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S>`
348//
349// This is the service factory itself:
350// * **Input**: `IncomingStream<'a, L>` - A struct containing connection metadata:
351// - `io: &'a TokioIo<L::Io>` - A borrowed reference to the connection's IO stream
352// - `remote_addr: L::Addr` - The remote address of the client
353//
354// * **Output**: Returns a new `S` service instance for this specific connection
355//
356// * **Error**: Must be `Infallible` - service creation must never fail
357//
358// * **Higher-Rank Trait Bound (`for<'a>`)**: The factory must work
359// with `IncomingStream` that borrows the IO with *any* lifetime `'a`. This is
360// necessary because the IO is borrowed only temporarily during service creation,
361// and we don't know the specific lifetime at compile time.
362//
363// - `for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send`
364//
365// The future returned by calling the make_service must be `Send` for any lifetime,
366// so it can be awaited across threads while creating the service.
367//
368// ## Example Flow
369//
370// ```text
371// 1. Listener.accept() → (io, remote_addr)
372// 2. make_service.call(IncomingStream { io: &io, remote_addr }) → Future<Output = S>
373// 3. service.call(request) → Future<Output = Response>
374// 4. Repeat step 3 for each request on the connection
375// ```
376//
377// ## Why These Bounds Matter
378//
379// 1. **Services can be spawned onto Tokio tasks** (Send + 'static)
380// 2. **Multiple requests can be handled per connection** (Clone)
381// 3. **Error handling is infallible** - errors become HTTP responses, not Tower errors
382// 4. **The MakeService works with borrowed connection info** - via HRTB with IncomingStream
383// This allows inspection of connection metadata without transferring ownership
384//
385// ============================================================================
386
387/// An incoming stream that bundles connection information.
388///
389/// This struct serves as the request type for the `make_service` Tower service,
390/// allowing it to access connection-level metadata when creating per-connection services.
391///
392/// # Purpose
393///
394/// In Tower/Hyper's model, `make_service` is called once per connection to create
395/// a service that handles all HTTP requests on that connection. `IncomingStream`
396/// provides the connection information needed to customize service creation based on:
397/// - The remote address (for logging or access control)
398/// - The underlying IO type (for protocol detection or configuration)
399///
400/// # Design
401///
402/// This type holds a **reference** to the IO rather than ownership because:
403/// - The actual IO is still needed by Hyper to serve the connection after `make_service` returns
404/// - The `make_service` only needs to inspect connection metadata, not take ownership
405///
406/// # Lifetime Safety
407///
408/// The lifetime `'a` ensures the reference to IO remains valid only during the
409/// `make_service.call()` invocation. After the service is created, the IO is
410/// moved into a spawned task to handle the connection. This is safe because:
411///
412/// ```text
413/// let io = TokioIo::new(stream); // IO created
414/// let service = make_service.call(
415/// IncomingStream { io: &io, .. } // Borrowed during call
416/// ).await; // Borrow ends
417/// tokio::spawn(serve_connection(io, ..)); // IO moved to task
418/// ```
419///
420/// The borrow checker guarantees the reference doesn't outlive the IO object.
421///
422/// Used with [`serve`] and [`crate::routing::IntoMakeServiceWithConnectInfo`].
423#[derive(Debug)]
424pub struct IncomingStream<'a, L>
425where
426 L: Listener,
427{
428 /// Reference to the IO for this connection
429 pub io: &'a TokioIo<L::Io>,
430 /// Remote address of the client
431 pub remote_addr: L::Addr,
432}
433
434impl<L> IncomingStream<'_, L>
435where
436 L: Listener,
437{
438 /// Get a reference to the inner IO type.
439 pub fn io(&self) -> &L::Io {
440 self.io.inner()
441 }
442
443 /// Returns the remote address that this stream is bound to.
444 pub fn remote_addr(&self) -> &L::Addr {
445 &self.remote_addr
446 }
447}
448
449/// Serve the service with the supplied listener.
450///
451/// This implementation provides zero-cost abstraction for shutdown coordination.
452/// When graceful shutdown is not used, there is no runtime overhead - no watch channels
453/// are allocated and no `tokio::select!` is used.
454///
455/// It supports both HTTP/1 as well as HTTP/2.
456///
457/// This function accepts services wrapped with [`crate::routing::IntoMakeService`] or
458/// [`crate::routing::IntoMakeServiceWithConnectInfo`].
459///
460/// For generated Smithy services, use `.into_make_service()` or
461/// `.into_make_service_with_connect_info::<C>()`. For services wrapped with
462/// Tower middleware, use `IntoMakeService::new(service)`.
463///
464/// # Error Handling
465///
466/// Note that both `make_service` and the generated service must have `Error = Infallible`.
467/// This means:
468/// - Your service factory cannot fail when creating per-connection services
469/// - Your request handlers cannot return errors (use proper HTTP error responses instead)
470///
471/// If you need fallible service creation, consider handling errors within your
472/// `make_service` implementation and returning a service that produces error responses.
473///
474/// # Examples
475///
476/// Serving a Smithy service with a TCP listener:
477///
478/// ```rust,ignore
479/// use tokio::net::TcpListener;
480///
481/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
482/// aws_smithy_http_server::serve(listener, app.into_make_service()).await.unwrap();
483/// ```
484///
485/// Serving with middleware applied:
486///
487/// ```rust,ignore
488/// use tokio::net::TcpListener;
489/// use tower::Layer;
490/// use tower_http::timeout::TimeoutLayer;
491/// use http::StatusCode;
492/// use std::time::Duration;
493/// use aws_smithy_http_server::routing::IntoMakeService;
494///
495/// let app = /* ... build service ... */;
496/// let app = TimeoutLayer::new(Duration::from_secs(30)).layer(app);
497///
498/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
499/// aws_smithy_http_server::serve(listener, IntoMakeService::new(app)).await.unwrap();
500/// ```
501///
502/// For graceful shutdown:
503///
504/// ```rust,ignore
505/// use tokio::net::TcpListener;
506/// use tokio::signal;
507///
508/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
509/// aws_smithy_http_server::serve(listener, app.into_make_service())
510/// .with_graceful_shutdown(async {
511/// signal::ctrl_c().await.expect("failed to listen for Ctrl+C");
512/// })
513/// .await
514/// .unwrap();
515/// ```
516///
517/// With connection info:
518///
519/// ```rust,ignore
520/// use tokio::net::TcpListener;
521/// use std::net::SocketAddr;
522///
523/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
524/// aws_smithy_http_server::serve(
525/// listener,
526/// app.into_make_service_with_connect_info::<SocketAddr>()
527/// )
528/// .await
529/// .unwrap();
530/// ```
531pub fn serve<L, M, S, B>(listener: L, make_service: M) -> Serve<L, M, S, B>
532where
533 L: Listener,
534 // Body bounds: see module documentation for details
535 B: HttpBody + Send + 'static,
536 B::Data: Send,
537 B::Error: Into<Box<dyn StdError + Send + Sync>>,
538 // Service bounds: see module documentation for details
539 S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
540 S::Future: Send,
541 // MakeService bounds: see module documentation for details
542 M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S>,
543{
544 Serve::new(listener, make_service)
545}
546
547/// A server future that serves HTTP connections.
548///
549/// This is the return type of [`serve`]. It implements [`IntoFuture`], so
550/// you can directly `.await` it:
551///
552/// ```ignore
553/// serve(listener, service).await?;
554/// ```
555///
556/// Before awaiting, you can configure it:
557/// - [`configure_hyper`](Self::configure_hyper) - Configure Hyper's connection builder
558/// - [`with_graceful_shutdown`](Self::with_graceful_shutdown) - Enable graceful shutdown
559/// - [`local_addr`](Self::local_addr) - Get the bound address
560///
561/// Created by [`serve`].
562#[must_use = "Serve does nothing until you `.await` or call `.into_future()` on it"]
563pub struct Serve<L, M, S, B> {
564 listener: L,
565 make_service: M,
566 hyper_builder: Option<Arc<Builder<TokioExecutor>>>,
567 _marker: PhantomData<(S, B)>,
568}
569
570impl<L, M, S, B> fmt::Debug for Serve<L, M, S, B>
571where
572 L: fmt::Debug,
573{
574 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575 f.debug_struct("Serve")
576 .field("listener", &self.listener)
577 .field("has_hyper_config", &self.hyper_builder.is_some())
578 .finish_non_exhaustive()
579 }
580}
581
582impl<L, M, S, B> Serve<L, M, S, B>
583where
584 L: Listener,
585{
586 fn new(listener: L, make_service: M) -> Self {
587 Self {
588 listener,
589 make_service,
590 hyper_builder: None,
591 _marker: PhantomData,
592 }
593 }
594
595 /// Configure the underlying Hyper connection builder.
596 ///
597 /// This allows you to customize Hyper's HTTP/1 and HTTP/2 settings,
598 /// such as timeouts, max concurrent streams, keep-alive behavior, etc.
599 ///
600 /// The configuration is applied once and the configured builder is cloned
601 /// for each connection, providing optimal performance.
602 ///
603 /// # Example
604 ///
605 /// ```ignore
606 /// use std::time::Duration;
607 ///
608 /// serve(listener, service)
609 /// .configure_hyper(|builder| {
610 /// builder
611 /// .http1()
612 /// .keep_alive(true)
613 /// .http2()
614 /// .max_concurrent_streams(200)
615 /// })
616 /// .await?;
617 /// ```
618 ///
619 /// # Advanced: Per-Connection Configuration
620 ///
621 /// If you need per-connection customization (e.g., different settings based on
622 /// the remote address), you can implement your own connection loop. See the
623 /// module-level documentation for examples.
624 pub fn configure_hyper<F>(mut self, f: F) -> Self
625 where
626 F: FnOnce(
627 hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
628 ) -> hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
629 {
630 let builder = Builder::new(TokioExecutor::new());
631 self.hyper_builder = Some(Arc::new(f(builder)));
632 self
633 }
634
635 /// Enable graceful shutdown for the server.
636 pub fn with_graceful_shutdown<F>(self, signal: F) -> ServeWithGracefulShutdown<L, M, S, F, B>
637 where
638 F: Future<Output = ()> + Send + 'static,
639 {
640 ServeWithGracefulShutdown::new(self.listener, self.make_service, signal, self.hyper_builder)
641 }
642
643 /// Returns the local address this server is bound to.
644 pub fn local_addr(&self) -> io::Result<L::Addr> {
645 self.listener.local_addr()
646 }
647}
648
649/// Macro to create an accept loop without graceful shutdown.
650///
651/// Accepts connections in a loop and handles them with the connection handler.
652macro_rules! accept_loop {
653 ($listener:expr, $make_service:expr, $hyper_builder:expr) => {
654 loop {
655 let (io, remote_addr) = $listener.accept().await;
656 handle_connection::<L, M, S, B>(&mut $make_service, io, remote_addr, $hyper_builder.as_ref(), true, None)
657 .await;
658 }
659 };
660}
661
662/// Macro to create an accept loop with graceful shutdown support.
663///
664/// Accepts connections in a loop with a shutdown signal that can interrupt the loop.
665/// Uses `tokio::select!` to race between accepting new connections and receiving the
666/// shutdown signal.
667macro_rules! accept_loop_with_shutdown {
668 ($listener:expr, $make_service:expr, $hyper_builder:expr, $signal:expr, $graceful:expr) => {
669 loop {
670 tokio::select! {
671 result = $listener.accept() => {
672 let (io, remote_addr) = result;
673 handle_connection::<L, M, S, B>(
674 &mut $make_service,
675 io,
676 remote_addr,
677 $hyper_builder.as_ref(),
678 true,
679 Some(&$graceful),
680 )
681 .await;
682 }
683 _ = $signal.as_mut() => {
684 tracing::trace!("received graceful shutdown signal, not accepting new connections");
685 break;
686 }
687 }
688 }
689 };
690}
691
692// Implement IntoFuture so we can await Serve directly
693impl<L, M, S, B> IntoFuture for Serve<L, M, S, B>
694where
695 L: Listener,
696 L::Addr: Debug,
697 // Body bounds
698 B: HttpBody + Send + 'static,
699 B::Data: Send,
700 B::Error: Into<Box<dyn StdError + Send + Sync>>,
701 // Service bounds
702 S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
703 S::Future: Send,
704 // MakeService bounds
705 M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
706 for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
707{
708 type Output = io::Result<()>;
709 type IntoFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
710
711 fn into_future(self) -> Self::IntoFuture {
712 Box::pin(async move {
713 let Self {
714 mut listener,
715 mut make_service,
716 hyper_builder,
717 _marker,
718 } = self;
719
720 accept_loop!(listener, make_service, hyper_builder)
721 })
722 }
723}
724
725/// A server future with graceful shutdown enabled.
726///
727/// This type is created by calling [`Serve::with_graceful_shutdown`]. It implements
728/// [`IntoFuture`], so you can directly `.await` it.
729///
730/// When the shutdown signal completes, the server will:
731/// 1. Stop accepting new connections
732/// 2. Wait for all in-flight requests to complete (or until timeout if configured)
733/// 3. Return once all connections are closed
734///
735/// Configure the shutdown timeout with [`with_shutdown_timeout`](Self::with_shutdown_timeout).
736///
737/// Created by [`Serve::with_graceful_shutdown`].
738#[must_use = "ServeWithGracefulShutdown does nothing until you `.await` or call `.into_future()` on it"]
739pub struct ServeWithGracefulShutdown<L, M, S, F, B> {
740 listener: L,
741 make_service: M,
742 signal: F,
743 hyper_builder: Option<Arc<Builder<TokioExecutor>>>,
744 shutdown_timeout: Option<Duration>,
745 _marker: PhantomData<(S, B)>,
746}
747
748impl<L, M, S, F, B> fmt::Debug for ServeWithGracefulShutdown<L, M, S, F, B>
749where
750 L: Listener + fmt::Debug,
751{
752 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753 f.debug_struct("ServeWithGracefulShutdown")
754 .field("listener", &self.listener)
755 .field("has_hyper_config", &self.hyper_builder.is_some())
756 .field("shutdown_timeout", &self.shutdown_timeout)
757 .finish_non_exhaustive()
758 }
759}
760
761impl<L: Listener, M, S, F, B> ServeWithGracefulShutdown<L, M, S, F, B> {
762 fn new(listener: L, make_service: M, signal: F, hyper_builder: Option<Arc<Builder<TokioExecutor>>>) -> Self
763 where
764 F: Future<Output = ()> + Send + 'static,
765 {
766 Self {
767 listener,
768 make_service,
769 signal,
770 hyper_builder,
771 shutdown_timeout: None,
772 _marker: PhantomData,
773 }
774 }
775
776 /// Set a timeout for graceful shutdown.
777 ///
778 /// If the timeout expires before all connections complete, a warning is logged
779 /// and the server returns successfully. Note that this does **not** forcibly
780 /// terminate connections - it only stops waiting for them.
781 ///
782 /// # Example
783 ///
784 /// ```rust,ignore
785 /// use std::time::Duration;
786 ///
787 /// serve(listener, app.into_make_service())
788 /// .with_graceful_shutdown(shutdown_signal())
789 /// .with_shutdown_timeout(Duration::from_secs(30))
790 /// .await?; // Returns Ok(()) even if timeout expires
791 /// ```
792 pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
793 self.shutdown_timeout = Some(timeout);
794 self
795 }
796
797 /// Returns the local address this server is bound to.
798 pub fn local_addr(&self) -> io::Result<L::Addr> {
799 self.listener.local_addr()
800 }
801}
802
803// Implement IntoFuture so we can await WithGracefulShutdown directly
804impl<L, M, S, F, B> IntoFuture for ServeWithGracefulShutdown<L, M, S, F, B>
805where
806 L: Listener,
807 L::Addr: Debug,
808 // Body bounds
809 B: HttpBody + Send + 'static,
810 B::Data: Send,
811 B::Error: Into<Box<dyn StdError + Send + Sync>>,
812 // Service bounds
813 S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
814 S::Future: Send,
815 // MakeService bounds
816 M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
817 for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
818 // Shutdown signal
819 F: Future<Output = ()> + Send + 'static,
820{
821 type Output = io::Result<()>;
822 type IntoFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
823
824 fn into_future(self) -> Self::IntoFuture {
825 Box::pin(async move {
826 let Self {
827 mut listener,
828 mut make_service,
829 signal,
830 hyper_builder,
831 shutdown_timeout,
832 _marker,
833 } = self;
834
835 // Initialize graceful shutdown
836 let graceful = hyper_util::server::graceful::GracefulShutdown::new();
837 let mut signal = std::pin::pin!(signal);
838
839 accept_loop_with_shutdown!(listener, make_service, hyper_builder, signal, graceful);
840
841 drop(listener);
842
843 tracing::trace!("waiting for in-flight connections to finish");
844
845 // Wait for all in-flight connections (with optional timeout)
846 match shutdown_timeout {
847 Some(timeout) => match tokio::time::timeout(timeout, graceful.shutdown()).await {
848 Ok(_) => {
849 tracing::trace!("all in-flight connections completed during graceful shutdown");
850 }
851 Err(_) => {
852 tracing::warn!(
853 timeout_secs = timeout.as_secs(),
854 "graceful shutdown timeout expired, some connections may not have completed"
855 );
856 }
857 },
858 None => {
859 graceful.shutdown().await;
860 tracing::trace!("all in-flight connections completed during graceful shutdown");
861 }
862 }
863
864 Ok(())
865 })
866 }
867}
868
869/// Connection handling function.
870///
871/// Handles connections by using runtime branching on `use_upgrades` and optional
872/// `graceful` shutdown.
873async fn handle_connection<L, M, S, B>(
874 make_service: &mut M,
875 conn_io: <L as Listener>::Io,
876 remote_addr: <L as Listener>::Addr,
877 hyper_builder: Option<&Arc<Builder<TokioExecutor>>>,
878 use_upgrades: bool,
879 graceful: Option<&hyper_util::server::graceful::GracefulShutdown>,
880) where
881 L: Listener,
882 L::Addr: Debug,
883 // Body bounds
884 B: HttpBody + Send + 'static,
885 B::Data: Send,
886 B::Error: Into<Box<dyn StdError + Send + Sync>>,
887 // Service bounds
888 S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
889 S::Future: Send,
890 // MakeService bounds
891 M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
892 for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
893{
894 let watcher = graceful.map(|g| g.watcher());
895 let tokio_io = TokioIo::new(conn_io);
896
897 tracing::trace!("connection {remote_addr:?} accepted");
898
899 make_service
900 .ready()
901 .await
902 .expect("make_service error type is Infallible and cannot fail");
903
904 let tower_service = make_service
905 .call(IncomingStream {
906 io: &tokio_io,
907 remote_addr,
908 })
909 .await
910 .expect("make_service error type is Infallible and cannot fail");
911
912 let hyper_service = TowerToHyperService::new(tower_service);
913
914 // Clone the Arc (cheap - just increments refcount) or create a default builder
915 let builder = hyper_builder
916 .map(Arc::clone)
917 .unwrap_or_else(|| Arc::new(Builder::new(TokioExecutor::new())));
918
919 tokio::spawn(async move {
920 let result = if use_upgrades {
921 // Auto-detect mode - use with_upgrades for HTTP/1 upgrade support
922 let conn = builder.serve_connection_with_upgrades(tokio_io, hyper_service);
923 if let Some(watcher) = watcher {
924 watcher.watch(conn).await
925 } else {
926 conn.await
927 }
928 } else {
929 // Protocol is already decided (http1_only or http2_only) - skip preface reading
930 let conn = builder.serve_connection(tokio_io, hyper_service);
931 if let Some(watcher) = watcher {
932 watcher.watch(conn).await
933 } else {
934 conn.await
935 }
936 };
937
938 if let Err(err) = result {
939 tracing::trace!(error = ?err, "failed to serve connection");
940 }
941 });
942}