Skip to main content

aws_smithy_http_server/serve/
mod.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Serve utilities for running HTTP servers.
7//!
8//! This module provides a convenient [`serve`] function similar to `axum::serve`
9//! for easily serving Tower services with Hyper.
10//!
11//! ## When to Use This Module
12//!
13//! - Use [`serve`] when you need a simple, batteries-included HTTP server
14//! - For more control over the Hyper connection builder, use [`.configure_hyper()`](Serve::configure_hyper)
15//! - For Lambda environments, see the `aws-lambda` feature and `routing::lambda_handler`
16//!
17//! ## How It Works
18//!
19//! The `serve` function creates a connection acceptance loop that:
20//!
21//! 1. **Accepts connections** via the [`Listener`] trait (e.g., [`TcpListener`](tokio::net::TcpListener))
22//! 2. **Creates per-connection services** by calling the `make_service` with [`IncomingStream`]
23//! 3. **Converts Tower services to Hyper** using `TowerToHyperService`
24//! 4. **Spawns a task** for each connection to handle HTTP requests
25//!
26//! ```text
27//! ┌─────────┐      ┌──────────────┐      ┌──────────────┐      ┌────────┐
28//! │Listener │─────▶│IncomingStream│─────▶│ make_service │─────▶│ Hyper  │
29//! │ accept  │      │ (io + addr)  │      │  (Tower)     │      │ spawn  │
30//! └─────────┘      └──────────────┘      └──────────────┘      └────────┘
31//! ```
32//!
33//! The [`IncomingStream`] provides connection metadata to the service factory,
34//! allowing per-connection customization based on remote address or IO type
35//!
36//! ## HTTP Protocol Selection
37//!
38//! By default, `serve` uses HTTP/1 with upgrade support, allowing clients to
39//! negotiate HTTP/2 via the HTTP/1.1 Upgrade mechanism or ALPN. The protocol is
40//! auto-detected for each connection.
41//!
42//! You can customize this behavior with [`.configure_hyper()`](Serve::configure_hyper):
43//!
44//! ```rust,ignore
45//! // Force HTTP/2 only (skips upgrade negotiation)
46//! serve(listener, app.into_make_service())
47//!     .configure_hyper(|builder| {
48//!         builder.http2_only()
49//!     })
50//!     .await?;
51//!
52//! // Force HTTP/1 only with keep-alive
53//! serve(listener, app.into_make_service())
54//!     .configure_hyper(|builder| {
55//!         builder.http1().keep_alive(true)
56//!     })
57//!     .await?;
58//! ```
59//!
60//! **Performance note**: When using `.http2_only()` or `.http1()`, the server skips
61//! the HTTP/1 upgrade preface reading, which can reduce connection setup latency.
62//!
63//! ## Graceful Shutdown
64//!
65//! Graceful shutdown is zero-cost when not used - no watch channels are allocated
66//! and no `tokio::select!` overhead is incurred. Call
67//! [`.with_graceful_shutdown(signal)`](Serve::with_graceful_shutdown) to enable it:
68//!
69//! ```ignore
70//! serve(listener, service)
71//!     .with_graceful_shutdown(async {
72//!         tokio::signal::ctrl_c().await.expect("failed to listen for Ctrl+C");
73//!     })
74//!     .await
75//! ```
76//!
77//! This ensures in-flight requests complete before shutdown. Use
78//! [`.with_shutdown_timeout(duration)`](ServeWithGracefulShutdown::with_shutdown_timeout)
79//! to set a maximum wait time.
80//!
81//! ## Common Patterns
82//!
83//! ### Limiting Concurrent Connections
84//!
85//! Use [`ListenerExt::limit_connections`] to prevent resource exhaustion:
86//!
87//! ```rust,ignore
88//! use aws_smithy_http_server::serve::ListenerExt;
89//!
90//! let listener = TcpListener::bind("0.0.0.0:3000")
91//!     .await?
92//!     .limit_connections(1000);  // Max 1000 concurrent connections
93//!
94//! serve(listener, app.into_make_service()).await?;
95//! ```
96//!
97//! ### Accessing Connection Information
98//!
99//! Use `.into_make_service_with_connect_info::<T>()` to access connection metadata
100//! in your handlers:
101//!
102//! ```rust,ignore
103//! use std::net::SocketAddr;
104//! use aws_smithy_http_server::request::connect_info::ConnectInfo;
105//!
106//! // In your handler:
107//! async fn my_handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
108//!     format!("Request from: {}", addr)
109//! }
110//!
111//! // When serving:
112//! serve(
113//!     listener,
114//!     app.into_make_service_with_connect_info::<SocketAddr>()
115//! ).await?;
116//! ```
117//!
118//! ### Custom TCP Settings
119//!
120//! Use [`ListenerExt::tap_io`] to configure TCP options:
121//!
122//! ```rust,ignore
123//! use aws_smithy_http_server::serve::ListenerExt;
124//!
125//! let listener = TcpListener::bind("0.0.0.0:3000")
126//!     .await?
127//!     .tap_io(|stream| {
128//!         let _ = stream.set_nodelay(true);
129//!     });
130//!
131//! serve(listener, app.into_make_service()).await?;
132//! ```
133//!
134//! ## Timeouts and Connection Management
135//!
136//! ### Available Timeout Types
137//!
138//! | Timeout Type | What It Does | How to Configure |
139//! |--------------|--------------|------------------|
140//! | **Header Read** | Time limit for reading HTTP headers | `.configure_hyper()` with `.http1().header_read_timeout()` |
141//! | **Request** | Time limit for processing one request | Tower's `TimeoutLayer` |
142//! | **Connection Duration** | Total connection lifetime limit | Custom accept loop with `tokio::time::timeout` |
143//! | **HTTP/2 Keep-Alive** | Idle timeout between HTTP/2 requests | `.configure_hyper()` with `.http2().keep_alive_*()` |
144//!
145//! **Examples:**
146//! - `examples/header_read_timeout.rs` - Configure header read timeout
147//! - `examples/request_timeout.rs` - Add request-level timeouts
148//! - `examples/custom_accept_loop.rs` - Implement connection duration limits
149//! - `examples/http2_keepalive.rs` - Configure HTTP/2 keep-alive
150//! - `examples/connection_limiting.rs` - Limit concurrent connections
151//! - `examples/request_concurrency_limiting.rs` - Limit concurrent requests
152//!
153//! ### Connection Duration vs Idle Timeout
154//!
155//! **Connection duration timeout**: Closes the connection after N seconds total, regardless of activity.
156//! Implemented with `tokio::time::timeout` wrapping the connection future.
157//!
158//! **Idle timeout**: Closes the connection only when inactive between requests.
159//! - HTTP/2: Available via `.keep_alive_interval()` and `.keep_alive_timeout()`
160//! - HTTP/1.1: Not available without modifying Hyper
161//!
162//! See `examples/custom_accept_loop.rs` for a working connection duration timeout example.
163//!
164//! ### Connection Limiting vs Request Limiting
165//!
166//! **Connection limiting** (`.limit_connections()`): Limits the number of TCP connections.
167//! Use this to prevent socket/file descriptor exhaustion.
168//!
169//! **Request limiting** (`ConcurrencyLimitLayer`): Limits in-flight requests.
170//! Use this to prevent work queue exhaustion. With HTTP/2, one connection can have multiple
171//! requests in flight simultaneously.
172//!
173//! Most applications should use both - they protect different layers.
174//!
175//! ## Troubleshooting
176//!
177//! ### Type Errors
178//!
179//! If you encounter complex error messages about trait bounds, check:
180//!
181//! 1. **Service Error Type**: Your service must have `Error = Infallible`
182//!    ```rust,ignore
183//!    // ✓ Correct - handlers return responses, not Results
184//!    async fn handler() -> Response<Body> { ... }
185//!
186//!    // ✗ Wrong - cannot use Result<Response, E>
187//!    async fn handler() -> Result<Response<Body>, MyError> { ... }
188//!    ```
189//!
190//! 2. **MakeService Wrapper**: Use the correct wrapper for your service:
191//!    ```rust,ignore
192//!    use aws_smithy_http_server::routing::IntoMakeService;
193//!
194//!    // For Smithy services:
195//!    app.into_make_service()
196//!
197//!    // For services with middleware:
198//!    IntoMakeService::new(service)
199//!    ```
200//!
201//! ### Graceful Shutdown Not Working
202//!
203//! If graceful shutdown doesn't wait for connections:
204//!
205//! - Ensure you call `.with_graceful_shutdown()` **before** `.await`
206//! - The signal future must be `Send + 'static`
207//! - Consider adding a timeout with `.with_shutdown_timeout()`
208//!
209//! ### Connection Limit Not Applied
210//!
211//! Remember that `.limit_connections()` applies to the listener **before** passing
212//! it to `serve()`:
213//!
214//! ```rust,ignore
215//! // ✓ Correct
216//! let listener = TcpListener::bind("0.0.0.0:3000")
217//!     .await?
218//!     .limit_connections(100);
219//! serve(listener, app.into_make_service()).await?;
220//!
221//! // ✗ Wrong - limit_connections must be called on listener
222//! serve(TcpListener::bind("0.0.0.0:3000").await?, app.into_make_service())
223//!     .limit_connections(100)  // This method doesn't exist on Serve
224//!     .await?;
225//! ```
226//!
227//! ## Advanced: Custom Connection Handling
228//!
229//! If you need per-connection customization (e.g., different Hyper settings based on
230//! the remote address), you can implement your own connection loop using the building
231//! blocks provided by this module:
232//!
233//! ```rust,ignore
234//! use aws_smithy_http_server::routing::IntoMakeService;
235//! use aws_smithy_http_server::serve::Listener;
236//! use hyper_util::rt::{TokioExecutor, TokioIo};
237//! use hyper_util::server::conn::auto::Builder;
238//! use hyper_util::service::TowerToHyperService;
239//! use tower::ServiceExt;
240//!
241//! let mut listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
242//! let make_service = app.into_make_service_with_connect_info::<SocketAddr>();
243//!
244//! loop {
245//!     let (stream, remote_addr) = listener.accept().await?;
246//!     let io = TokioIo::new(stream);
247//!
248//!     // Per-connection Hyper configuration
249//!     let mut builder = Builder::new(TokioExecutor::new());
250//!     if remote_addr.ip().is_loopback() {
251//!         builder = builder.http2_only();  // Local connections use HTTP/2
252//!     } else {
253//!         builder = builder.http1().keep_alive(true);  // External use HTTP/1
254//!     }
255//!
256//!     let tower_service = make_service
257//!         .ready()
258//!         .await?
259//!         .call(IncomingStream { io: &io, remote_addr })
260//!         .await?;
261//!
262//!     let hyper_service = TowerToHyperService::new(tower_service);
263//!
264//!     tokio::spawn(async move {
265//!         if let Err(err) = builder.serve_connection(io, hyper_service).await {
266//!             eprintln!("Error serving connection: {}", err);
267//!         }
268//!     });
269//! }
270//! ```
271//!
272//! This approach provides complete flexibility while still leveraging the efficient
273//! Hyper and Tower integration provided by this module.
274//!
275//! Portions of the implementation are adapted from axum
276//! (<https://github.com/tokio-rs/axum>), which is licensed under the MIT License.
277//! Copyright (c) 2019 Axum Contributors
278
279use std::convert::Infallible;
280use std::error::Error as StdError;
281use std::fmt::{self, Debug};
282use std::future::{Future, IntoFuture};
283use std::io;
284use std::marker::PhantomData;
285use std::pin::Pin;
286use std::sync::Arc;
287use std::time::Duration;
288
289use http_body::Body as HttpBody;
290use hyper::body::Incoming;
291use hyper_util::rt::{TokioExecutor, TokioIo};
292use hyper_util::server::conn::auto::Builder;
293use hyper_util::service::TowerToHyperService;
294use tower::{Service, ServiceExt as _};
295
296mod listener;
297
298pub use self::listener::{ConnLimiter, ConnLimiterIo, Listener, ListenerExt, TapIo};
299
300// ============================================================================
301// Type Bounds Documentation
302// ============================================================================
303//
304// ## Body Bounds (B)
305// HTTP response bodies must satisfy:
306// - `B: HttpBody + Send + 'static` - Implement the body trait and be sendable
307// - `B::Data: Send` - Data chunks must be sendable across threads
308// - `B::Error: Into<Box<dyn StdError + Send + Sync>>` - Errors must be convertible
309//
310// ## Service Bounds (S)
311//
312// The `S` type parameter represents a **per-connection HTTP service** - a Tower service
313// that handles individual HTTP requests and returns HTTP responses.
314//
315// Required bounds:
316// - `S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible>`
317//
318//   This is the core Tower Service trait. It means:
319//   * **Input**: Takes an HTTP request with a streaming body (`Incoming` from Hyper)
320//   * **Output**: Returns an HTTP response with body type `B`
321//   * **Error**: Must be `Infallible`, meaning the service never returns errors at the
322//     Tower level. Any application errors must be converted into HTTP responses
323//     (e.g., 500 Internal Server Error) before reaching this layer.
324//
325// - `S: Clone + Send + 'static`
326//   * **Clone**: Each HTTP/1.1 or HTTP/2 connection may handle multiple requests
327//     sequentially or concurrently. The service must be cloneable so each request
328//     can get its own copy.
329//   * **Send**: The service will be moved into a spawned Tokio task, so it must be
330//     safe to send across thread boundaries.
331//   * **'static**: No borrowed references - the service must own all its data since
332//     it will outlive the connection setup phase.
333//
334// - `S::Future: Send`
335//   The future returned by `Service::call()` must also be `Send` so it can be
336//   polled from any thread in Tokio's thread pool.
337//
338// ## MakeService Bounds (M)
339//
340// The `M` type parameter represents a **service factory** - a Tower service that
341// creates a new `S` service for each incoming connection. This allows customizing
342// services based on connection metadata (remote address, TLS info, etc.).
343//
344// Connection Info → Service Factory → Per-Connection Service
345//
346// Required bounds:
347// - `M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S>`
348//
349//   This is the service factory itself:
350//   * **Input**: `IncomingStream<'a, L>` - A struct containing connection metadata:
351//     - `io: &'a TokioIo<L::Io>` - A borrowed reference to the connection's IO stream
352//     - `remote_addr: L::Addr` - The remote address of the client
353//
354//   * **Output**: Returns a new `S` service instance for this specific connection
355//
356//   * **Error**: Must be `Infallible` - service creation must never fail
357//
358//   * **Higher-Rank Trait Bound (`for<'a>`)**: The factory must work
359//     with `IncomingStream` that borrows the IO with *any* lifetime `'a`. This is
360//     necessary because the IO is borrowed only temporarily during service creation,
361//     and we don't know the specific lifetime at compile time.
362//
363// - `for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send`
364//
365//   The future returned by calling the make_service must be `Send` for any lifetime,
366//   so it can be awaited across threads while creating the service.
367//
368// ## Example Flow
369//
370// ```text
371// 1. Listener.accept() → (io, remote_addr)
372// 2. make_service.call(IncomingStream { io: &io, remote_addr }) → Future<Output = S>
373// 3. service.call(request) → Future<Output = Response>
374// 4. Repeat step 3 for each request on the connection
375// ```
376//
377// ## Why These Bounds Matter
378//
379// 1. **Services can be spawned onto Tokio tasks** (Send + 'static)
380// 2. **Multiple requests can be handled per connection** (Clone)
381// 3. **Error handling is infallible** - errors become HTTP responses, not Tower errors
382// 4. **The MakeService works with borrowed connection info** - via HRTB with IncomingStream
383//    This allows inspection of connection metadata without transferring ownership
384//
385// ============================================================================
386
387/// An incoming stream that bundles connection information.
388///
389/// This struct serves as the request type for the `make_service` Tower service,
390/// allowing it to access connection-level metadata when creating per-connection services.
391///
392/// # Purpose
393///
394/// In Tower/Hyper's model, `make_service` is called once per connection to create
395/// a service that handles all HTTP requests on that connection. `IncomingStream`
396/// provides the connection information needed to customize service creation based on:
397/// - The remote address (for logging or access control)
398/// - The underlying IO type (for protocol detection or configuration)
399///
400/// # Design
401///
402/// This type holds a **reference** to the IO rather than ownership because:
403/// - The actual IO is still needed by Hyper to serve the connection after `make_service` returns
404/// - The `make_service` only needs to inspect connection metadata, not take ownership
405///
406/// # Lifetime Safety
407///
408/// The lifetime `'a` ensures the reference to IO remains valid only during the
409/// `make_service.call()` invocation. After the service is created, the IO is
410/// moved into a spawned task to handle the connection. This is safe because:
411///
412/// ```text
413/// let io = TokioIo::new(stream);           // IO created
414/// let service = make_service.call(
415///     IncomingStream { io: &io, .. }       // Borrowed during call
416/// ).await;                                  // Borrow ends
417/// tokio::spawn(serve_connection(io, ..));  // IO moved to task
418/// ```
419///
420/// The borrow checker guarantees the reference doesn't outlive the IO object.
421///
422/// Used with [`serve`] and [`crate::routing::IntoMakeServiceWithConnectInfo`].
423#[derive(Debug)]
424pub struct IncomingStream<'a, L>
425where
426    L: Listener,
427{
428    /// Reference to the IO for this connection
429    pub io: &'a TokioIo<L::Io>,
430    /// Remote address of the client
431    pub remote_addr: L::Addr,
432}
433
434impl<L> IncomingStream<'_, L>
435where
436    L: Listener,
437{
438    /// Get a reference to the inner IO type.
439    pub fn io(&self) -> &L::Io {
440        self.io.inner()
441    }
442
443    /// Returns the remote address that this stream is bound to.
444    pub fn remote_addr(&self) -> &L::Addr {
445        &self.remote_addr
446    }
447}
448
449/// Serve the service with the supplied listener.
450///
451/// This implementation provides zero-cost abstraction for shutdown coordination.
452/// When graceful shutdown is not used, there is no runtime overhead - no watch channels
453/// are allocated and no `tokio::select!` is used.
454///
455/// It supports both HTTP/1 as well as HTTP/2.
456///
457/// This function accepts services wrapped with [`crate::routing::IntoMakeService`] or
458/// [`crate::routing::IntoMakeServiceWithConnectInfo`].
459///
460/// For generated Smithy services, use `.into_make_service()` or
461/// `.into_make_service_with_connect_info::<C>()`. For services wrapped with
462/// Tower middleware, use `IntoMakeService::new(service)`.
463///
464/// # Error Handling
465///
466/// Note that both `make_service` and the generated service must have `Error = Infallible`.
467/// This means:
468/// - Your service factory cannot fail when creating per-connection services
469/// - Your request handlers cannot return errors (use proper HTTP error responses instead)
470///
471/// If you need fallible service creation, consider handling errors within your
472/// `make_service` implementation and returning a service that produces error responses.
473///
474/// # Examples
475///
476/// Serving a Smithy service with a TCP listener:
477///
478/// ```rust,ignore
479/// use tokio::net::TcpListener;
480///
481/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
482/// aws_smithy_http_server::serve(listener, app.into_make_service()).await.unwrap();
483/// ```
484///
485/// Serving with middleware applied:
486///
487/// ```rust,ignore
488/// use tokio::net::TcpListener;
489/// use tower::Layer;
490/// use tower_http::timeout::TimeoutLayer;
491/// use http::StatusCode;
492/// use std::time::Duration;
493/// use aws_smithy_http_server::routing::IntoMakeService;
494///
495/// let app = /* ... build service ... */;
496/// let app = TimeoutLayer::new(Duration::from_secs(30)).layer(app);
497///
498/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
499/// aws_smithy_http_server::serve(listener, IntoMakeService::new(app)).await.unwrap();
500/// ```
501///
502/// For graceful shutdown:
503///
504/// ```rust,ignore
505/// use tokio::net::TcpListener;
506/// use tokio::signal;
507///
508/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
509/// aws_smithy_http_server::serve(listener, app.into_make_service())
510///     .with_graceful_shutdown(async {
511///         signal::ctrl_c().await.expect("failed to listen for Ctrl+C");
512///     })
513///     .await
514///     .unwrap();
515/// ```
516///
517/// With connection info:
518///
519/// ```rust,ignore
520/// use tokio::net::TcpListener;
521/// use std::net::SocketAddr;
522///
523/// let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
524/// aws_smithy_http_server::serve(
525///     listener,
526///     app.into_make_service_with_connect_info::<SocketAddr>()
527/// )
528/// .await
529/// .unwrap();
530/// ```
531pub fn serve<L, M, S, B>(listener: L, make_service: M) -> Serve<L, M, S, B>
532where
533    L: Listener,
534    // Body bounds: see module documentation for details
535    B: HttpBody + Send + 'static,
536    B::Data: Send,
537    B::Error: Into<Box<dyn StdError + Send + Sync>>,
538    // Service bounds: see module documentation for details
539    S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
540    S::Future: Send,
541    // MakeService bounds: see module documentation for details
542    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S>,
543{
544    Serve::new(listener, make_service)
545}
546
547/// A server future that serves HTTP connections.
548///
549/// This is the return type of [`serve`]. It implements [`IntoFuture`], so
550/// you can directly `.await` it:
551///
552/// ```ignore
553/// serve(listener, service).await?;
554/// ```
555///
556/// Before awaiting, you can configure it:
557/// - [`configure_hyper`](Self::configure_hyper) - Configure Hyper's connection builder
558/// - [`with_graceful_shutdown`](Self::with_graceful_shutdown) - Enable graceful shutdown
559/// - [`local_addr`](Self::local_addr) - Get the bound address
560///
561/// Created by [`serve`].
562#[must_use = "Serve does nothing until you `.await` or call `.into_future()` on it"]
563pub struct Serve<L, M, S, B> {
564    listener: L,
565    make_service: M,
566    hyper_builder: Option<Arc<Builder<TokioExecutor>>>,
567    _marker: PhantomData<(S, B)>,
568}
569
570impl<L, M, S, B> fmt::Debug for Serve<L, M, S, B>
571where
572    L: fmt::Debug,
573{
574    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
575        f.debug_struct("Serve")
576            .field("listener", &self.listener)
577            .field("has_hyper_config", &self.hyper_builder.is_some())
578            .finish_non_exhaustive()
579    }
580}
581
582impl<L, M, S, B> Serve<L, M, S, B>
583where
584    L: Listener,
585{
586    fn new(listener: L, make_service: M) -> Self {
587        Self {
588            listener,
589            make_service,
590            hyper_builder: None,
591            _marker: PhantomData,
592        }
593    }
594
595    /// Configure the underlying Hyper connection builder.
596    ///
597    /// This allows you to customize Hyper's HTTP/1 and HTTP/2 settings,
598    /// such as timeouts, max concurrent streams, keep-alive behavior, etc.
599    ///
600    /// The configuration is applied once and the configured builder is cloned
601    /// for each connection, providing optimal performance.
602    ///
603    /// # Example
604    ///
605    /// ```ignore
606    /// use std::time::Duration;
607    ///
608    /// serve(listener, service)
609    ///     .configure_hyper(|builder| {
610    ///         builder
611    ///             .http1()
612    ///             .keep_alive(true)
613    ///             .http2()
614    ///             .max_concurrent_streams(200)
615    ///     })
616    ///     .await?;
617    /// ```
618    ///
619    /// # Advanced: Per-Connection Configuration
620    ///
621    /// If you need per-connection customization (e.g., different settings based on
622    /// the remote address), you can implement your own connection loop. See the
623    /// module-level documentation for examples.
624    pub fn configure_hyper<F>(mut self, f: F) -> Self
625    where
626        F: FnOnce(
627            hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
628        ) -> hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
629    {
630        let builder = Builder::new(TokioExecutor::new());
631        self.hyper_builder = Some(Arc::new(f(builder)));
632        self
633    }
634
635    /// Enable graceful shutdown for the server.
636    pub fn with_graceful_shutdown<F>(self, signal: F) -> ServeWithGracefulShutdown<L, M, S, F, B>
637    where
638        F: Future<Output = ()> + Send + 'static,
639    {
640        ServeWithGracefulShutdown::new(self.listener, self.make_service, signal, self.hyper_builder)
641    }
642
643    /// Returns the local address this server is bound to.
644    pub fn local_addr(&self) -> io::Result<L::Addr> {
645        self.listener.local_addr()
646    }
647}
648
649/// Macro to create an accept loop without graceful shutdown.
650///
651/// Accepts connections in a loop and handles them with the connection handler.
652macro_rules! accept_loop {
653    ($listener:expr, $make_service:expr, $hyper_builder:expr) => {
654        loop {
655            let (io, remote_addr) = $listener.accept().await;
656            handle_connection::<L, M, S, B>(&mut $make_service, io, remote_addr, $hyper_builder.as_ref(), true, None)
657                .await;
658        }
659    };
660}
661
662/// Macro to create an accept loop with graceful shutdown support.
663///
664/// Accepts connections in a loop with a shutdown signal that can interrupt the loop.
665/// Uses `tokio::select!` to race between accepting new connections and receiving the
666/// shutdown signal.
667macro_rules! accept_loop_with_shutdown {
668    ($listener:expr, $make_service:expr, $hyper_builder:expr, $signal:expr, $graceful:expr) => {
669        loop {
670            tokio::select! {
671                result = $listener.accept() => {
672                    let (io, remote_addr) = result;
673                    handle_connection::<L, M, S, B>(
674                        &mut $make_service,
675                        io,
676                        remote_addr,
677                        $hyper_builder.as_ref(),
678                        true,
679                        Some(&$graceful),
680                    )
681                    .await;
682                }
683                _ = $signal.as_mut() => {
684                    tracing::trace!("received graceful shutdown signal, not accepting new connections");
685                    break;
686                }
687            }
688        }
689    };
690}
691
692// Implement IntoFuture so we can await Serve directly
693impl<L, M, S, B> IntoFuture for Serve<L, M, S, B>
694where
695    L: Listener,
696    L::Addr: Debug,
697    // Body bounds
698    B: HttpBody + Send + 'static,
699    B::Data: Send,
700    B::Error: Into<Box<dyn StdError + Send + Sync>>,
701    // Service bounds
702    S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
703    S::Future: Send,
704    // MakeService bounds
705    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
706    for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
707{
708    type Output = io::Result<()>;
709    type IntoFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
710
711    fn into_future(self) -> Self::IntoFuture {
712        Box::pin(async move {
713            let Self {
714                mut listener,
715                mut make_service,
716                hyper_builder,
717                _marker,
718            } = self;
719
720            accept_loop!(listener, make_service, hyper_builder)
721        })
722    }
723}
724
725/// A server future with graceful shutdown enabled.
726///
727/// This type is created by calling [`Serve::with_graceful_shutdown`]. It implements
728/// [`IntoFuture`], so you can directly `.await` it.
729///
730/// When the shutdown signal completes, the server will:
731/// 1. Stop accepting new connections
732/// 2. Wait for all in-flight requests to complete (or until timeout if configured)
733/// 3. Return once all connections are closed
734///
735/// Configure the shutdown timeout with [`with_shutdown_timeout`](Self::with_shutdown_timeout).
736///
737/// Created by [`Serve::with_graceful_shutdown`].
738#[must_use = "ServeWithGracefulShutdown does nothing until you `.await` or call `.into_future()` on it"]
739pub struct ServeWithGracefulShutdown<L, M, S, F, B> {
740    listener: L,
741    make_service: M,
742    signal: F,
743    hyper_builder: Option<Arc<Builder<TokioExecutor>>>,
744    shutdown_timeout: Option<Duration>,
745    _marker: PhantomData<(S, B)>,
746}
747
748impl<L, M, S, F, B> fmt::Debug for ServeWithGracefulShutdown<L, M, S, F, B>
749where
750    L: Listener + fmt::Debug,
751{
752    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
753        f.debug_struct("ServeWithGracefulShutdown")
754            .field("listener", &self.listener)
755            .field("has_hyper_config", &self.hyper_builder.is_some())
756            .field("shutdown_timeout", &self.shutdown_timeout)
757            .finish_non_exhaustive()
758    }
759}
760
761impl<L: Listener, M, S, F, B> ServeWithGracefulShutdown<L, M, S, F, B> {
762    fn new(listener: L, make_service: M, signal: F, hyper_builder: Option<Arc<Builder<TokioExecutor>>>) -> Self
763    where
764        F: Future<Output = ()> + Send + 'static,
765    {
766        Self {
767            listener,
768            make_service,
769            signal,
770            hyper_builder,
771            shutdown_timeout: None,
772            _marker: PhantomData,
773        }
774    }
775
776    /// Set a timeout for graceful shutdown.
777    ///
778    /// If the timeout expires before all connections complete, a warning is logged
779    /// and the server returns successfully. Note that this does **not** forcibly
780    /// terminate connections - it only stops waiting for them.
781    ///
782    /// # Example
783    ///
784    /// ```rust,ignore
785    /// use std::time::Duration;
786    ///
787    /// serve(listener, app.into_make_service())
788    ///     .with_graceful_shutdown(shutdown_signal())
789    ///     .with_shutdown_timeout(Duration::from_secs(30))
790    ///     .await?;  // Returns Ok(()) even if timeout expires
791    /// ```
792    pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
793        self.shutdown_timeout = Some(timeout);
794        self
795    }
796
797    /// Returns the local address this server is bound to.
798    pub fn local_addr(&self) -> io::Result<L::Addr> {
799        self.listener.local_addr()
800    }
801}
802
803// Implement IntoFuture so we can await WithGracefulShutdown directly
804impl<L, M, S, F, B> IntoFuture for ServeWithGracefulShutdown<L, M, S, F, B>
805where
806    L: Listener,
807    L::Addr: Debug,
808    // Body bounds
809    B: HttpBody + Send + 'static,
810    B::Data: Send,
811    B::Error: Into<Box<dyn StdError + Send + Sync>>,
812    // Service bounds
813    S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
814    S::Future: Send,
815    // MakeService bounds
816    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
817    for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
818    // Shutdown signal
819    F: Future<Output = ()> + Send + 'static,
820{
821    type Output = io::Result<()>;
822    type IntoFuture = Pin<Box<dyn Future<Output = io::Result<()>> + Send>>;
823
824    fn into_future(self) -> Self::IntoFuture {
825        Box::pin(async move {
826            let Self {
827                mut listener,
828                mut make_service,
829                signal,
830                hyper_builder,
831                shutdown_timeout,
832                _marker,
833            } = self;
834
835            // Initialize graceful shutdown
836            let graceful = hyper_util::server::graceful::GracefulShutdown::new();
837            let mut signal = std::pin::pin!(signal);
838
839            accept_loop_with_shutdown!(listener, make_service, hyper_builder, signal, graceful);
840
841            drop(listener);
842
843            tracing::trace!("waiting for in-flight connections to finish");
844
845            // Wait for all in-flight connections (with optional timeout)
846            match shutdown_timeout {
847                Some(timeout) => match tokio::time::timeout(timeout, graceful.shutdown()).await {
848                    Ok(_) => {
849                        tracing::trace!("all in-flight connections completed during graceful shutdown");
850                    }
851                    Err(_) => {
852                        tracing::warn!(
853                            timeout_secs = timeout.as_secs(),
854                            "graceful shutdown timeout expired, some connections may not have completed"
855                        );
856                    }
857                },
858                None => {
859                    graceful.shutdown().await;
860                    tracing::trace!("all in-flight connections completed during graceful shutdown");
861                }
862            }
863
864            Ok(())
865        })
866    }
867}
868
869/// Connection handling function.
870///
871/// Handles connections by using runtime branching on `use_upgrades` and optional
872/// `graceful` shutdown.
873async fn handle_connection<L, M, S, B>(
874    make_service: &mut M,
875    conn_io: <L as Listener>::Io,
876    remote_addr: <L as Listener>::Addr,
877    hyper_builder: Option<&Arc<Builder<TokioExecutor>>>,
878    use_upgrades: bool,
879    graceful: Option<&hyper_util::server::graceful::GracefulShutdown>,
880) where
881    L: Listener,
882    L::Addr: Debug,
883    // Body bounds
884    B: HttpBody + Send + 'static,
885    B::Data: Send,
886    B::Error: Into<Box<dyn StdError + Send + Sync>>,
887    // Service bounds
888    S: Service<http::Request<Incoming>, Response = http::Response<B>, Error = Infallible> + Clone + Send + 'static,
889    S::Future: Send,
890    // MakeService bounds
891    M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
892    for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
893{
894    let watcher = graceful.map(|g| g.watcher());
895    let tokio_io = TokioIo::new(conn_io);
896
897    tracing::trace!("connection {remote_addr:?} accepted");
898
899    make_service
900        .ready()
901        .await
902        .expect("make_service error type is Infallible and cannot fail");
903
904    let tower_service = make_service
905        .call(IncomingStream {
906            io: &tokio_io,
907            remote_addr,
908        })
909        .await
910        .expect("make_service error type is Infallible and cannot fail");
911
912    let hyper_service = TowerToHyperService::new(tower_service);
913
914    // Clone the Arc (cheap - just increments refcount) or create a default builder
915    let builder = hyper_builder
916        .map(Arc::clone)
917        .unwrap_or_else(|| Arc::new(Builder::new(TokioExecutor::new())));
918
919    tokio::spawn(async move {
920        let result = if use_upgrades {
921            // Auto-detect mode - use with_upgrades for HTTP/1 upgrade support
922            let conn = builder.serve_connection_with_upgrades(tokio_io, hyper_service);
923            if let Some(watcher) = watcher {
924                watcher.watch(conn).await
925            } else {
926                conn.await
927            }
928        } else {
929            // Protocol is already decided (http1_only or http2_only) - skip preface reading
930            let conn = builder.serve_connection(tokio_io, hyper_service);
931            if let Some(watcher) = watcher {
932                watcher.watch(conn).await
933            } else {
934                conn.await
935            }
936        };
937
938        if let Err(err) = result {
939            tracing::trace!(error = ?err, "failed to serve connection");
940        }
941    });
942}