Skip to main content

oxihttp_server/
lib.rs

1//! OxiHTTP Server - Pure-Rust HTTP server for the OxiHTTP stack.
2//!
3//! Provides an HTTP/1.1 and HTTP/2 server with routing, middleware pipeline,
4//! and graceful shutdown support.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use oxihttp_server::{Server, Router, response};
10//! use bytes::Bytes;
11//! use http_body_util::Full;
12//!
13//! # async fn example() -> Result<(), oxihttp_core::OxiHttpError> {
14//! let router = Router::new()
15//!     .get("/hello", |_req| async {
16//!         response::text_response("Hello, World!")
17//!     })
18//!     .health("/health");
19//!
20//! Server::bind("127.0.0.1:3000")
21//!     .serve(router)
22//!     .await?;
23//! # Ok(())
24//! # }
25//! ```
26
27#![forbid(unsafe_code)]
28
29#[cfg(feature = "compression")]
30pub mod compression;
31pub mod extractor;
32pub mod middleware;
33pub mod response;
34pub mod router;
35#[cfg(feature = "sse")]
36pub mod sse;
37#[cfg(feature = "static-files")]
38pub mod static_files;
39#[cfg(feature = "tower")]
40pub mod tower_compat;
41#[cfg(feature = "tower")]
42pub mod tower_middleware;
43#[cfg(feature = "websocket")]
44pub mod ws;
45#[cfg(feature = "websocket")]
46pub mod ws_frame;
47
48#[cfg(feature = "compression")]
49pub use compression::{Compression, CompressionAlgorithm, CompressionConfig};
50
51#[cfg(feature = "sse")]
52pub use sse::{SseEvent, SseResponse, SseSender};
53#[cfg(feature = "static-files")]
54pub use static_files::{ServeDir, ServeFile};
55
56#[cfg(feature = "tls")]
57pub mod tls;
58#[cfg(feature = "tls")]
59pub use tls::{PeerCertInfo, TlsConfig};
60
61#[cfg(feature = "h3")]
62pub mod h3;
63
64#[cfg(feature = "tower")]
65pub use tower_compat::RouterMakeService;
66#[cfg(feature = "tower")]
67pub use tower_middleware::{LoggingLayer, RequestIdLayer};
68
69#[cfg(feature = "websocket")]
70pub use ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade};
71
72use bytes::Bytes;
73use http_body_util::Full;
74use hyper::service::service_fn;
75use hyper_util::rt::TokioExecutor;
76use hyper_util::server::conn::auto;
77use std::future::Future;
78use std::net::SocketAddr;
79use std::pin::Pin;
80use std::sync::Arc;
81use tokio::net::TcpListener;
82
83use middleware::MiddlewarePipeline;
84use oxihttp_core::OxiHttpError;
85
86pub use extractor::{FromRequestParts, RequestParts, TypedHeader};
87pub use middleware::{BodyLimitConfig, CorsConfig, RateLimiter, TimeoutConfig};
88pub use router::{Request, Router};
89
90// ---------------------------------------------------------------------------
91// ServerHttp2Settings
92// ---------------------------------------------------------------------------
93
94/// HTTP/2 settings for the server side.
95#[derive(Debug, Clone, Default)]
96pub struct ServerHttp2Settings {
97    /// Initial window size for stream-level flow control (bytes).
98    pub initial_stream_window_size: Option<u32>,
99    /// Initial window size for connection-level flow control (bytes).
100    pub initial_connection_window_size: Option<u32>,
101    /// Enable adaptive flow control.
102    pub adaptive_window: Option<bool>,
103    /// Maximum number of concurrent HTTP/2 streams per connection.
104    pub max_concurrent_streams: Option<u32>,
105    /// Maximum HTTP/2 frame size (bytes).
106    pub max_frame_size: Option<u32>,
107    /// Interval for HTTP/2 PING keep-alive frames.
108    pub keep_alive_interval: Option<std::time::Duration>,
109    /// Timeout for acknowledgement of keep-alive PING.
110    pub keep_alive_timeout: Option<std::time::Duration>,
111}
112
113// ---------------------------------------------------------------------------
114// configure_auto_builder — shared helper for accept loops
115// ---------------------------------------------------------------------------
116
117fn configure_auto_builder(builder: &mut auto::Builder<TokioExecutor>, h2: &ServerHttp2Settings) {
118    let mut h2b = builder.http2();
119    if let Some(sz) = h2.initial_stream_window_size {
120        h2b.initial_stream_window_size(sz);
121    }
122    if let Some(sz) = h2.initial_connection_window_size {
123        h2b.initial_connection_window_size(sz);
124    }
125    if let Some(adaptive) = h2.adaptive_window {
126        h2b.adaptive_window(adaptive);
127    }
128    if let Some(n) = h2.max_concurrent_streams {
129        h2b.max_concurrent_streams(n);
130    }
131    if let Some(sz) = h2.max_frame_size {
132        h2b.max_frame_size(sz);
133    }
134    if let Some(interval) = h2.keep_alive_interval {
135        h2b.keep_alive_interval(interval);
136    }
137    if let Some(timeout) = h2.keep_alive_timeout {
138        h2b.keep_alive_timeout(timeout);
139    }
140}
141
142// ---------------------------------------------------------------------------
143// Server
144// ---------------------------------------------------------------------------
145
146/// An HTTP server that listens on a TCP socket and dispatches requests
147/// through a `Router` with optional middleware.
148pub struct Server;
149
150impl Server {
151    /// Bind to the given address, returning a `ServerBuilder` for further configuration.
152    pub fn bind(addr: &str) -> ServerBuilder {
153        ServerBuilder {
154            addr: addr.to_string(),
155            middleware: MiddlewarePipeline::new(),
156            graceful_shutdown: None,
157            max_connections: None,
158            tcp_nodelay: None,
159            tcp_keepalive: None,
160            http2_settings: None,
161            #[cfg(feature = "tls")]
162            tls: None,
163            alpn_protocols: Vec::new(),
164            #[cfg(feature = "tower")]
165            tower_layers: Vec::new(),
166        }
167    }
168}
169
170/// Builder for configuring and starting an HTTP server.
171pub struct ServerBuilder {
172    addr: String,
173    middleware: MiddlewarePipeline,
174    graceful_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
175    max_connections: Option<usize>,
176    /// TCP_NODELAY for accepted connections.
177    tcp_nodelay: Option<bool>,
178    /// TCP keepalive idle time for accepted connections.
179    tcp_keepalive: Option<std::time::Duration>,
180    /// HTTP/2 server-side tuning settings.
181    http2_settings: Option<ServerHttp2Settings>,
182    #[cfg(feature = "tls")]
183    tls: Option<tls::TlsConfig>,
184    /// ALPN protocols advertised in TLS handshakes.
185    ///
186    /// Only used when building TLS config via `with_tls_from_pem` or
187    /// `with_tls_from_der`. Call `with_alpn` BEFORE `with_tls_from_pem` /
188    /// `with_tls_from_der` so the protocols are set at build time.
189    /// Has no effect when a pre-built `TlsConfig` is supplied via `with_tls`.
190    alpn_protocols: Vec<Vec<u8>>,
191    #[cfg(feature = "tower")]
192    tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
193}
194
195impl ServerBuilder {
196    /// Add CORS middleware with the given configuration.
197    pub fn with_cors(mut self, config: CorsConfig) -> Self {
198        self.middleware = self.middleware.with_cors(config);
199        self
200    }
201
202    /// Add a request body size limit (in bytes).
203    pub fn with_body_limit(mut self, max_bytes: u64) -> Self {
204        self.middleware = self.middleware.with_body_limit(max_bytes);
205        self
206    }
207
208    /// Add rate limiting middleware.
209    pub fn with_rate_limiter(mut self, limiter: RateLimiter) -> Self {
210        self.middleware = self.middleware.with_rate_limiter(limiter);
211        self
212    }
213
214    /// Add a request processing timeout.
215    pub fn with_timeout(mut self, duration: std::time::Duration) -> Self {
216        self.middleware = self.middleware.with_timeout(duration);
217        self
218    }
219
220    /// Add a tower `Layer` to the server's middleware pipeline.
221    ///
222    /// Layers are applied outermost-first: the first `with_layer` call
223    /// produces the layer that sees requests first.
224    ///
225    /// The layer's produced `Service` must satisfy:
226    /// - `Service<http::Request<Incoming>, Response = http::Response<Full<Bytes>>, Error = OxiHttpError>`
227    /// - `Clone + Send + 'static` (future must be `Send + 'static`)
228    #[cfg(feature = "tower")]
229    pub fn with_layer<L>(mut self, layer: L) -> Self
230    where
231        L: tower_layer::Layer<tower_compat::BoxedRouterService> + Send + Sync + Clone + 'static,
232        L::Service: tower_service::Service<
233                http::Request<hyper::body::Incoming>,
234                Response = http::Response<Full<Bytes>>,
235                Error = OxiHttpError,
236            > + Clone
237            + Send
238            + 'static,
239        <L::Service as tower_service::Service<http::Request<hyper::body::Incoming>>>::Future:
240            Send + 'static,
241    {
242        self.tower_layers
243            .push(Arc::new(tower_compat::OwnedLayer(layer)));
244        self
245    }
246
247    /// Set a maximum number of concurrent connections.
248    pub fn with_max_connections(mut self, n: usize) -> Self {
249        self.max_connections = Some(n);
250        self
251    }
252
253    /// Enable or disable `TCP_NODELAY` on accepted connections.
254    pub fn with_tcp_nodelay(mut self, nodelay: bool) -> Self {
255        self.tcp_nodelay = Some(nodelay);
256        self
257    }
258
259    /// Set the TCP keepalive idle time for accepted connections.
260    pub fn with_tcp_keepalive(mut self, duration: std::time::Duration) -> Self {
261        self.tcp_keepalive = Some(duration);
262        self
263    }
264
265    /// Set HTTP/2 server-side connection tuning parameters.
266    pub fn with_http2_settings(mut self, settings: ServerHttp2Settings) -> Self {
267        self.http2_settings = Some(settings);
268        self
269    }
270
271    /// Set a graceful shutdown signal.
272    pub fn with_graceful_shutdown<F>(mut self, signal: F) -> Self
273    where
274        F: Future<Output = ()> + Send + 'static,
275    {
276        self.graceful_shutdown = Some(Box::pin(signal));
277        self
278    }
279
280    /// Convenience: shut down on Ctrl+C.
281    pub fn shutdown_on_ctrl_c(self) -> Self {
282        self.with_graceful_shutdown(async {
283            let _ = tokio::signal::ctrl_c().await;
284        })
285    }
286
287    /// Configure TLS with a pre-built `TlsConfig`.
288    ///
289    /// When using this method, `with_alpn` has no effect — ALPN must be
290    /// configured on the `TlsConfig` directly before passing it here.
291    #[cfg(feature = "tls")]
292    pub fn with_tls(mut self, config: tls::TlsConfig) -> Self {
293        self.tls = Some(config);
294        self
295    }
296
297    /// Configure TLS from PEM-encoded certificate and key bytes.
298    ///
299    /// To customise ALPN protocols, call [`with_alpn`][Self::with_alpn]
300    /// **before** calling this method.
301    #[cfg(feature = "tls")]
302    pub fn with_tls_from_pem(
303        mut self,
304        cert_pem: &[u8],
305        key_pem: &[u8],
306    ) -> Result<Self, OxiHttpError> {
307        self.tls = Some(tls::TlsConfig::from_pem_with_alpn(
308            cert_pem,
309            key_pem,
310            &self.alpn_protocols,
311        )?);
312        Ok(self)
313    }
314
315    /// Configure TLS from DER-encoded certificate chain and private key.
316    ///
317    /// To customise ALPN protocols, call [`with_alpn`][Self::with_alpn]
318    /// **before** calling this method.
319    #[cfg(feature = "tls")]
320    pub fn with_tls_from_der(
321        mut self,
322        certs: Vec<rustls_pki_types::CertificateDer<'static>>,
323        key: rustls_pki_types::PrivateKeyDer<'static>,
324    ) -> Result<Self, OxiHttpError> {
325        self.tls = Some(tls::TlsConfig::from_der_with_alpn(
326            certs,
327            key,
328            &self.alpn_protocols,
329        )?);
330        Ok(self)
331    }
332
333    /// Set the ALPN protocol list advertised during TLS handshakes.
334    ///
335    /// Example: advertise HTTP/2 and HTTP/1.1 in preference order:
336    ///
337    /// ```no_run
338    /// # use oxihttp_server::Server;
339    /// # async fn example() -> Result<(), oxihttp_core::OxiHttpError> {
340    /// Server::bind("127.0.0.1:443")
341    ///     .with_alpn(["h2", "http/1.1"])
342    ///     .with_tls_from_pem(b"...cert...", b"...key...")?
343    ///     .serve(oxihttp_server::Router::new())
344    ///     .await
345    /// # }
346    /// ```
347    ///
348    /// **Ordering**: call `with_alpn` **before** [`with_tls_from_pem`][Self::with_tls_from_pem]
349    /// or [`with_tls_from_der`][Self::with_tls_from_der] — the ALPN list is baked
350    /// into the TLS config at that point. When a pre-built `TlsConfig` is
351    /// supplied via [`with_tls`][Self::with_tls], this method has no effect.
352    #[cfg(feature = "tls")]
353    pub fn with_alpn<P: AsRef<[u8]>>(mut self, protocols: impl IntoIterator<Item = P>) -> Self {
354        self.alpn_protocols = protocols.into_iter().map(|p| p.as_ref().to_vec()).collect();
355        self
356    }
357
358    /// Start the server with the given router.
359    ///
360    /// This function runs until the graceful shutdown signal fires or the
361    /// server encounters a fatal error.
362    pub async fn serve(self, router: Router) -> Result<(), OxiHttpError> {
363        let listener = TcpListener::bind(&self.addr)
364            .await
365            .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
366        run_server(
367            listener,
368            router,
369            self.middleware,
370            self.max_connections,
371            self.tcp_nodelay,
372            self.tcp_keepalive,
373            self.http2_settings,
374            self.graceful_shutdown,
375            #[cfg(feature = "tls")]
376            self.tls,
377            #[cfg(feature = "tower")]
378            self.tower_layers,
379        )
380        .await
381    }
382
383    /// Start the server and return the bound `SocketAddr` plus a future that
384    /// runs the server. Useful for tests where you need the address before
385    /// the server starts accepting.
386    pub async fn serve_with_addr(
387        self,
388        router: Router,
389    ) -> Result<
390        (
391            SocketAddr,
392            tokio::task::JoinHandle<Result<(), OxiHttpError>>,
393        ),
394        OxiHttpError,
395    > {
396        let listener = TcpListener::bind(&self.addr)
397            .await
398            .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
399        let addr = listener
400            .local_addr()
401            .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
402
403        let router = Arc::new(router);
404        let middleware = Arc::new(self.middleware);
405        let connection_semaphore = self
406            .max_connections
407            .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
408        let graceful_shutdown = self.graceful_shutdown;
409        let tcp_nodelay = self.tcp_nodelay;
410        let tcp_keepalive = self.tcp_keepalive;
411        let http2_settings = self.http2_settings.map(Arc::new);
412
413        #[cfg(feature = "tls")]
414        let tls_acceptor = self.tls.map(|c| Arc::new(c.acceptor));
415
416        #[cfg(feature = "tower")]
417        let tower_layers = self.tower_layers;
418        #[cfg(not(feature = "tower"))]
419        let tower_layers: Vec<()> = Vec::new();
420
421        let handle = tokio::spawn(async move {
422            let accept_handle = tokio::spawn(accept_loop(
423                listener,
424                router,
425                middleware,
426                connection_semaphore,
427                tcp_nodelay,
428                tcp_keepalive,
429                http2_settings,
430                tower_layers,
431                #[cfg(feature = "tls")]
432                tls_acceptor,
433            ));
434
435            if let Some(shutdown) = graceful_shutdown {
436                tokio::select! {
437                    _ = shutdown => {}
438                    result = accept_handle => {
439                        if let Err(e) = result {
440                            return Err(OxiHttpError::Server(format!(
441                                "accept loop panicked: {e}"
442                            )));
443                        }
444                    }
445                }
446            } else {
447                accept_handle
448                    .await
449                    .map_err(|e| OxiHttpError::Server(format!("accept loop panicked: {e}")))?;
450            }
451
452            Ok(())
453        });
454
455        Ok((addr, handle))
456    }
457}
458
459/// A server that has already bound its TCP port but has not yet started accepting connections.
460///
461/// Created by [`ServerBuilder::listen`].  Use [`BoundServer::local_addr`] to obtain the
462/// actual bound address (useful when binding port 0), then call [`BoundServer::serve`]
463/// to start accepting connections.
464pub struct BoundServer {
465    listener: TcpListener,
466    addr: SocketAddr,
467    inner: ServerBuilder,
468}
469
470impl BoundServer {
471    /// Return the local socket address the server is bound to.
472    pub fn local_addr(&self) -> SocketAddr {
473        self.addr
474    }
475
476    /// Start accepting connections (same semantics as [`ServerBuilder::serve`]).
477    pub async fn serve(self, router: Router) -> Result<(), OxiHttpError> {
478        run_server(
479            self.listener,
480            router,
481            self.inner.middleware,
482            self.inner.max_connections,
483            self.inner.tcp_nodelay,
484            self.inner.tcp_keepalive,
485            self.inner.http2_settings,
486            self.inner.graceful_shutdown,
487            #[cfg(feature = "tls")]
488            self.inner.tls,
489            #[cfg(feature = "tower")]
490            self.inner.tower_layers,
491        )
492        .await
493    }
494}
495
496impl ServerBuilder {
497    /// Bind the TCP port eagerly and return a [`BoundServer`] handle.
498    ///
499    /// Useful when you need the actual bound address (e.g. when binding port 0)
500    /// before the server starts accepting connections.
501    ///
502    /// ```rust,no_run
503    /// # use oxihttp_server::{Server, Router};
504    /// # async fn example() -> Result<(), oxihttp_core::OxiHttpError> {
505    /// let bound = Server::bind("127.0.0.1:0").listen().await?;
506    /// let port = bound.local_addr().port();
507    /// println!("Listening on port {port}");
508    /// bound.serve(Router::new()).await?;
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub async fn listen(self) -> Result<BoundServer, OxiHttpError> {
513        let listener = TcpListener::bind(&self.addr)
514            .await
515            .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
516        let addr = listener
517            .local_addr()
518            .map_err(|e| OxiHttpError::Io(Arc::new(e)))?;
519        Ok(BoundServer {
520            listener,
521            addr,
522            inner: self,
523        })
524    }
525}
526
527impl std::fmt::Debug for ServerBuilder {
528    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529        let mut s = f.debug_struct("ServerBuilder");
530        s.field("addr", &self.addr)
531            .field("middleware", &self.middleware)
532            .field("max_connections", &self.max_connections)
533            .field("tcp_nodelay", &self.tcp_nodelay)
534            .field("tcp_keepalive", &self.tcp_keepalive)
535            .field("http2_settings", &self.http2_settings)
536            .field("alpn_protocols_count", &self.alpn_protocols.len());
537        #[cfg(feature = "tls")]
538        s.field("tls", &self.tls);
539        #[cfg(feature = "tower")]
540        s.field("tower_layers", &self.tower_layers.len());
541        s.finish()
542    }
543}
544
545// ---------------------------------------------------------------------------
546// run_server — shared implementation used by ServerBuilder::serve and
547// BoundServer::serve
548// ---------------------------------------------------------------------------
549
550#[allow(clippy::too_many_arguments)]
551async fn run_server(
552    listener: TcpListener,
553    router: Router,
554    middleware: MiddlewarePipeline,
555    max_connections: Option<usize>,
556    tcp_nodelay: Option<bool>,
557    tcp_keepalive: Option<std::time::Duration>,
558    http2_settings: Option<ServerHttp2Settings>,
559    graceful_shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
560    #[cfg(feature = "tls")] tls: Option<tls::TlsConfig>,
561    #[cfg(feature = "tower")] tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
562) -> Result<(), OxiHttpError> {
563    let router = Arc::new(router);
564    let middleware = Arc::new(middleware);
565    let connection_semaphore = max_connections.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
566    let http2_settings = http2_settings.map(Arc::new);
567
568    #[cfg(feature = "tls")]
569    let tls_acceptor = tls.map(|c| Arc::new(c.acceptor));
570
571    #[cfg(feature = "tower")]
572    let tower_layers_val = tower_layers;
573    #[cfg(not(feature = "tower"))]
574    let tower_layers_val: Vec<()> = Vec::new();
575
576    // Spawn the accept loop.
577    let accept_handle = tokio::spawn(accept_loop(
578        listener,
579        router,
580        middleware,
581        connection_semaphore,
582        tcp_nodelay,
583        tcp_keepalive,
584        http2_settings,
585        tower_layers_val,
586        #[cfg(feature = "tls")]
587        tls_acceptor,
588    ));
589
590    // Wait for shutdown signal or accept loop completion.
591    if let Some(shutdown) = graceful_shutdown {
592        tokio::select! {
593            _ = shutdown => {
594                // Shutdown signal received — accept_handle will be dropped.
595            }
596            result = accept_handle => {
597                if let Err(e) = result {
598                    return Err(OxiHttpError::Server(format!("accept loop panicked: {e}")));
599                }
600            }
601        }
602    } else {
603        accept_handle
604            .await
605            .map_err(|e| OxiHttpError::Server(format!("accept loop panicked: {e}")))?;
606    }
607
608    Ok(())
609}
610
611// ---------------------------------------------------------------------------
612// accept_loop — no tower feature
613// ---------------------------------------------------------------------------
614
615#[cfg(not(feature = "tower"))]
616#[allow(clippy::too_many_arguments)]
617async fn accept_loop(
618    listener: TcpListener,
619    router: Arc<Router>,
620    middleware: Arc<MiddlewarePipeline>,
621    semaphore: Option<Arc<tokio::sync::Semaphore>>,
622    tcp_nodelay: Option<bool>,
623    tcp_keepalive: Option<std::time::Duration>,
624    http2_settings: Option<Arc<ServerHttp2Settings>>,
625    _tower_layers: Vec<()>,
626    #[cfg(feature = "tls")] tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
627) {
628    loop {
629        let accept_result = listener.accept().await;
630        let (stream, _remote_addr) = match accept_result {
631            Ok(conn) => conn,
632            Err(_) => continue,
633        };
634
635        // Apply TCP socket options immediately after accept.
636        if let Some(nodelay) = tcp_nodelay {
637            let _ = stream.set_nodelay(nodelay);
638        }
639        if let Some(ka_dur) = tcp_keepalive {
640            let ka = socket2::TcpKeepalive::new().with_time(ka_dur);
641            let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
642        }
643
644        let router = Arc::clone(&router);
645        let middleware = Arc::clone(&middleware);
646        let permit = if let Some(ref sem) = semaphore {
647            match sem.clone().try_acquire_owned() {
648                Ok(p) => Some(p),
649                Err(_) => continue,
650            }
651        } else {
652            None
653        };
654
655        #[cfg(feature = "tls")]
656        let tls = tls_acceptor.clone();
657        let h2_cfg = http2_settings.clone();
658
659        tokio::spawn(async move {
660            #[cfg(feature = "tls")]
661            if let Some(acceptor) = tls {
662                if let Ok(tls_stream) = acceptor.accept(stream).await {
663                    // Extract full ConnectionInfo via TlsConnectionExt BEFORE consuming
664                    // the stream. This populates typed version, cipher_suite, sni, ALPN,
665                    // and peer certificates in a single call.
666                    use oxitls::TlsConnectionExt as _;
667                    let conn_info = tls_stream.tls_connection_info();
668                    let (_, server_conn) = tls_stream.get_ref();
669                    let peer_info: Arc<tls::PeerCertInfo> = Arc::new(tls::PeerCertInfo {
670                        peer_certificates: server_conn
671                            .peer_certificates()
672                            .map(|certs| certs.iter().map(|c| c.clone().into_owned()).collect())
673                            .unwrap_or_default(),
674                        alpn_protocol: conn_info.alpn_protocol.clone(),
675                        // Keep original rustls Debug format for backward compatibility.
676                        protocol_version: server_conn.protocol_version().map(|v| format!("{v:?}")),
677                        version: conn_info.version,
678                        cipher_suite: conn_info.cipher_suite,
679                        sni: conn_info.sni.clone(),
680                    });
681
682                    let svc = service_fn(move |mut req: hyper::Request<hyper::body::Incoming>| {
683                        let router = Arc::clone(&router);
684                        let middleware = Arc::clone(&middleware);
685                        let pi = peer_info.clone();
686                        async move {
687                            req.extensions_mut().insert(pi);
688                            dispatch_with_middleware(router, middleware, req).await
689                        }
690                    });
691                    let mut builder = auto::Builder::new(TokioExecutor::new());
692                    if let Some(ref h2) = h2_cfg {
693                        configure_auto_builder(&mut builder, h2);
694                    }
695                    let io = hyper_util::rt::TokioIo::new(tls_stream);
696                    let _ = builder.serve_connection_with_upgrades(io, svc).await;
697                }
698                drop(permit);
699                return;
700            }
701
702            // Plain-HTTP path (only reached when TLS is not configured or the
703            // `tls` feature is disabled).
704            let mut builder = auto::Builder::new(TokioExecutor::new());
705            if let Some(ref h2) = h2_cfg {
706                configure_auto_builder(&mut builder, h2);
707            }
708            let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
709                let router = Arc::clone(&router);
710                let middleware = Arc::clone(&middleware);
711                async move { dispatch_with_middleware(router, middleware, req).await }
712            });
713            let io = hyper_util::rt::TokioIo::new(stream);
714            let _ = builder.serve_connection_with_upgrades(io, svc).await;
715            drop(permit);
716        });
717    }
718}
719
720// ---------------------------------------------------------------------------
721// accept_loop — with tower feature
722// ---------------------------------------------------------------------------
723
724#[cfg(feature = "tower")]
725#[allow(clippy::too_many_arguments)]
726async fn accept_loop(
727    listener: TcpListener,
728    router: Arc<Router>,
729    middleware: Arc<MiddlewarePipeline>,
730    semaphore: Option<Arc<tokio::sync::Semaphore>>,
731    tcp_nodelay: Option<bool>,
732    tcp_keepalive: Option<std::time::Duration>,
733    http2_settings: Option<Arc<ServerHttp2Settings>>,
734    tower_layers: Vec<Arc<dyn tower_compat::ErasedLayer>>,
735    #[cfg(feature = "tls")] tls_acceptor: Option<Arc<tokio_rustls::TlsAcceptor>>,
736) {
737    use tower_service::Service as _;
738
739    // Build the layered service once; each connection gets a clone.
740    let layered_svc = tower_compat::build_layered_service(Arc::clone(&router), &tower_layers);
741
742    loop {
743        let accept_result = listener.accept().await;
744        let (stream, _remote_addr) = match accept_result {
745            Ok(conn) => conn,
746            Err(_) => continue,
747        };
748
749        // Apply TCP socket options immediately after accept.
750        if let Some(nodelay) = tcp_nodelay {
751            let _ = stream.set_nodelay(nodelay);
752        }
753        if let Some(ka_dur) = tcp_keepalive {
754            let ka = socket2::TcpKeepalive::new().with_time(ka_dur);
755            let _ = socket2::SockRef::from(&stream).set_tcp_keepalive(&ka);
756        }
757
758        let middleware = Arc::clone(&middleware);
759        let permit = if let Some(ref sem) = semaphore {
760            match sem.clone().try_acquire_owned() {
761                Ok(p) => Some(p),
762                Err(_) => continue,
763            }
764        } else {
765            None
766        };
767
768        #[cfg(feature = "tls")]
769        let tls = tls_acceptor.clone();
770        let h2_cfg = http2_settings.clone();
771
772        // Clone the layered service for this connection.
773        let conn_svc = layered_svc.clone();
774
775        tokio::spawn(async move {
776            let mut builder = auto::Builder::new(TokioExecutor::new());
777            if let Some(ref h2) = h2_cfg {
778                configure_auto_builder(&mut builder, h2);
779            }
780
781            #[cfg(feature = "tls")]
782            if let Some(acceptor) = tls {
783                if let Ok(tls_stream) = acceptor.accept(stream).await {
784                    // Extract full ConnectionInfo via TlsConnectionExt BEFORE consuming
785                    // the stream. This populates typed version, cipher_suite, sni, ALPN,
786                    // and peer certificates in a single call.
787                    use oxitls::TlsConnectionExt as _;
788                    let conn_info = tls_stream.tls_connection_info();
789                    let (_, server_conn) = tls_stream.get_ref();
790                    let peer_info: Arc<tls::PeerCertInfo> = Arc::new(tls::PeerCertInfo {
791                        peer_certificates: server_conn
792                            .peer_certificates()
793                            .map(|certs| certs.iter().map(|c| c.clone().into_owned()).collect())
794                            .unwrap_or_default(),
795                        alpn_protocol: conn_info.alpn_protocol.clone(),
796                        // Keep original rustls Debug format for backward compatibility.
797                        protocol_version: server_conn.protocol_version().map(|v| format!("{v:?}")),
798                        version: conn_info.version,
799                        cipher_suite: conn_info.cipher_suite,
800                        sni: conn_info.sni.clone(),
801                    });
802
803                    let svc_tls =
804                        service_fn(move |mut req: hyper::Request<hyper::body::Incoming>| {
805                            let middleware = Arc::clone(&middleware);
806                            // Clone the service again so the closure is `FnMut` (hyper
807                            // may call it multiple times for keep-alive connections).
808                            let mut svc = conn_svc.clone();
809                            let peer_info = Arc::clone(&peer_info);
810                            async move {
811                                req.extensions_mut().insert(peer_info);
812
813                                // Pre-handler middleware (CORS, rate limit, body limit)
814                                if let Some(result) = middleware.pre_handle(&req).await {
815                                    return result.map_err(|e| OxiHttpError::Server(e.to_string()));
816                                }
817
818                                let origin = req
819                                    .headers()
820                                    .get(http::header::ORIGIN)
821                                    .and_then(|v| v.to_str().ok())
822                                    .map(|s| s.to_string());
823
824                                let handler_result =
825                                    if let Some(ref timeout_config) = middleware.timeout {
826                                        match tokio::time::timeout(
827                                            timeout_config.duration,
828                                            svc.call(req),
829                                        )
830                                        .await
831                                        {
832                                            Ok(result) => result,
833                                            Err(_) => middleware::TimeoutConfig::timeout_response(),
834                                        }
835                                    } else {
836                                        svc.call(req).await
837                                    };
838
839                                match handler_result {
840                                    Ok(mut resp) => {
841                                        middleware.post_handle(&mut resp, origin.as_deref());
842                                        Ok(resp)
843                                    }
844                                    Err(e) => {
845                                        let mut resp = hyper::Response::builder()
846                                            .status(http::StatusCode::INTERNAL_SERVER_ERROR)
847                                            .body(Full::new(Bytes::from(e.to_string())))
848                                            .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
849                                        middleware.post_handle(&mut resp, origin.as_deref());
850                                        Ok(resp)
851                                    }
852                                }
853                            }
854                        });
855
856                    let io = hyper_util::rt::TokioIo::new(tls_stream);
857                    let _ = builder.serve_connection_with_upgrades(io, svc_tls).await;
858                }
859                drop(permit);
860                return;
861            }
862
863            let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
864                let middleware = Arc::clone(&middleware);
865                // Clone the service again so the closure is `FnMut` (hyper
866                // may call it multiple times for keep-alive connections).
867                let mut svc = conn_svc.clone();
868                async move {
869                    // Pre-handler middleware (CORS, rate limit, body limit)
870                    if let Some(result) = middleware.pre_handle(&req).await {
871                        return result.map_err(|e| OxiHttpError::Server(e.to_string()));
872                    }
873
874                    let origin = req
875                        .headers()
876                        .get(http::header::ORIGIN)
877                        .and_then(|v| v.to_str().ok())
878                        .map(|s| s.to_string());
879
880                    // Route through the tower layer stack (includes timeout
881                    // wrapping when configured, plus LoggingLayer / RequestIdLayer
882                    // / user layers).
883                    let handler_result = if let Some(ref timeout_config) = middleware.timeout {
884                        match tokio::time::timeout(timeout_config.duration, svc.call(req)).await {
885                            Ok(result) => result,
886                            Err(_) => middleware::TimeoutConfig::timeout_response(),
887                        }
888                    } else {
889                        svc.call(req).await
890                    };
891
892                    match handler_result {
893                        Ok(mut resp) => {
894                            middleware.post_handle(&mut resp, origin.as_deref());
895                            Ok(resp)
896                        }
897                        Err(e) => {
898                            let mut resp = hyper::Response::builder()
899                                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
900                                .body(Full::new(Bytes::from(e.to_string())))
901                                .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
902                            middleware.post_handle(&mut resp, origin.as_deref());
903                            Ok(resp)
904                        }
905                    }
906                }
907            });
908
909            let io = hyper_util::rt::TokioIo::new(stream);
910            let _ = builder.serve_connection_with_upgrades(io, svc).await;
911            drop(permit);
912        });
913    }
914}
915
916// ---------------------------------------------------------------------------
917// Shared dispatch helper (used by the non-tower accept_loop)
918// ---------------------------------------------------------------------------
919
920#[cfg(not(feature = "tower"))]
921async fn dispatch_with_middleware(
922    router: Arc<Router>,
923    middleware: Arc<MiddlewarePipeline>,
924    req: hyper::Request<hyper::body::Incoming>,
925) -> Result<hyper::Response<Full<Bytes>>, OxiHttpError> {
926    // Pre-handler middleware
927    if let Some(result) = middleware.pre_handle(&req).await {
928        return result.map_err(|e| OxiHttpError::Server(e.to_string()));
929    }
930
931    let origin = req
932        .headers()
933        .get(http::header::ORIGIN)
934        .and_then(|v| v.to_str().ok())
935        .map(|s| s.to_string());
936
937    // Handle timeout if configured
938    let handler_result = if let Some(ref timeout_config) = middleware.timeout {
939        match tokio::time::timeout(timeout_config.duration, router.dispatch(req)).await {
940            Ok(result) => result,
941            Err(_) => middleware::TimeoutConfig::timeout_response(),
942        }
943    } else {
944        router.dispatch(req).await
945    };
946
947    match handler_result {
948        Ok(mut resp) => {
949            middleware.post_handle(&mut resp, origin.as_deref());
950            Ok(resp)
951        }
952        Err(e) => {
953            let mut resp = hyper::Response::builder()
954                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
955                .body(Full::new(Bytes::from(e.to_string())))
956                .map_err(|e2| OxiHttpError::Server(e2.to_string()))?;
957            middleware.post_handle(&mut resp, origin.as_deref());
958            Ok(resp)
959        }
960    }
961}