Skip to main content

hyperi_rustlib/http_server/
server.rs

1// Project:   hyperi-rustlib
2// File:      src/http_server/server.rs
3// Purpose:   HTTP server implementation
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! HTTP server implementation using axum.
10
11use crate::http_server::{HttpServerConfig, HttpServerError, Result};
12use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::net::TcpListener;
18#[cfg(not(feature = "shutdown"))]
19use tokio::signal;
20use tokio::sync::watch;
21use tower::limit::ConcurrencyLimitLayer;
22use tower_http::timeout::TimeoutLayer;
23use tower_http::trace::TraceLayer;
24
25/// High-performance HTTP server built on axum.
26///
27/// Provides graceful shutdown, health endpoints, and Tower middleware support.
28pub struct HttpServer {
29    config: HttpServerConfig,
30    ready: Arc<AtomicBool>,
31}
32
33impl HttpServer {
34    /// Create a new HTTP server with the given configuration.
35    #[must_use]
36    pub fn new(config: HttpServerConfig) -> Self {
37        let ready = Arc::new(AtomicBool::new(true));
38
39        #[cfg(feature = "health")]
40        {
41            let r = Arc::clone(&ready);
42            crate::health::HealthRegistry::register("http_server", move || {
43                if r.load(Ordering::Relaxed) {
44                    crate::health::HealthStatus::Healthy
45                } else {
46                    crate::health::HealthStatus::Unhealthy
47                }
48            });
49        }
50
51        Self { config, ready }
52    }
53
54    /// Create a new HTTP server bound to the specified address.
55    #[must_use]
56    pub fn bind(address: impl Into<String>) -> Self {
57        Self::new(HttpServerConfig::new(address))
58    }
59
60    /// Set the readiness state for the /health/ready endpoint.
61    pub fn set_ready(&self, ready: bool) {
62        self.ready.store(ready, Ordering::SeqCst);
63    }
64
65    /// Get the current readiness state.
66    #[must_use]
67    pub fn is_ready(&self) -> bool {
68        self.ready.load(Ordering::SeqCst)
69    }
70
71    /// Get a clone of the readiness flag for use in application state.
72    #[must_use]
73    pub fn ready_flag(&self) -> Arc<AtomicBool> {
74        Arc::clone(&self.ready)
75    }
76
77    /// Serve the given router until SIGTERM/SIGINT, then drain gracefully.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if binding fails or the server encounters an error.
82    pub async fn serve(self, app: Router) -> Result<()> {
83        #[cfg(feature = "shutdown")]
84        {
85            let token = crate::shutdown::install_signal_handler();
86            self.serve_with_shutdown(app, token.cancelled_owned()).await
87        }
88        #[cfg(not(feature = "shutdown"))]
89        {
90            self.serve_with_shutdown(app, shutdown_signal()).await
91        }
92    }
93
94    /// Serve with a custom shutdown signal.
95    ///
96    /// This is useful for testing or when you need custom shutdown logic.
97    ///
98    /// # Errors
99    ///
100    /// Returns an error if binding fails or the server encounters an error.
101    pub async fn serve_with_shutdown<F>(self, app: Router, shutdown: F) -> Result<()>
102    where
103        F: std::future::Future<Output = ()> + Send + 'static,
104    {
105        // Enforce config validity before binding: a
106        // config requesting unsupported in-process TLS must fail loudly here,
107        // not bind cleartext while is_tls_enabled() reports true.
108        self.config.validate().map_err(HttpServerError::TlsConfig)?;
109        let shutdown_timeout = self.config.shutdown_timeout();
110        let app = self.build_router(app);
111
112        let addr: SocketAddr =
113            self.config
114                .bind_address
115                .parse()
116                .map_err(|e| HttpServerError::Bind {
117                    address: self.config.bind_address.clone(),
118                    source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
119                })?;
120
121        let listener = TcpListener::bind(addr)
122            .await
123            .map_err(|e| HttpServerError::Bind {
124                address: self.config.bind_address.clone(),
125                source: e,
126            })?;
127
128        #[cfg(feature = "logger")]
129        tracing::info!(address = %addr, "HTTP server listening");
130
131        // Two-phase: unbounded wait for shutdown, then bounded drain.
132        // If drain exceeds shutdown_timeout, drop the serve future
133        // so K8s terminationGracePeriodSeconds isn't blown.
134        //
135        // F12: flip ready -> false BEFORE notifying drain start so
136        // /health/ready returns 503 the moment shutdown is signalled.
137        // K8s endpoint controller catches the 503 and stops routing
138        // before in-flight requests finish draining.
139        let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
140        let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
141        let ready_for_signal = Arc::clone(&self.ready);
142        let signal = async move {
143            shutdown.await;
144            ready_for_signal.store(false, Ordering::SeqCst);
145            if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
146                let _ = tx.send(());
147            }
148        };
149        // WithGracefulShutdown is IntoFuture, not Future.
150        let serve = axum::serve(listener, app)
151            .with_graceful_shutdown(signal)
152            .into_future();
153        tokio::pin!(serve);
154
155        tokio::select! {
156            result = &mut serve => result.map_err(HttpServerError::Io)?,
157            () = async {
158                let _ = drain_started_rx.await;
159                tokio::time::sleep(shutdown_timeout).await;
160            } => {
161                #[cfg(feature = "logger")]
162                tracing::warn!(
163                    timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
164                    "HTTP server graceful drain timed out -- forcing exit"
165                );
166            }
167        }
168
169        #[cfg(feature = "logger")]
170        tracing::info!("HTTP server shut down gracefully");
171
172        Ok(())
173    }
174
175    /// Serve and return a handle for programmatic shutdown.
176    ///
177    /// Returns a `ShutdownHandle` that can be used to trigger shutdown.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if binding fails.
182    pub async fn serve_with_handle(self, app: Router) -> Result<(ShutdownHandle, ServerFuture)> {
183        // Same validation gate as serve_with_shutdown.
184        self.config.validate().map_err(HttpServerError::TlsConfig)?;
185        let (tx, rx) = watch::channel(());
186        let handle = ShutdownHandle { sender: tx };
187
188        let shutdown_timeout = self.config.shutdown_timeout();
189        let app = self.build_router(app);
190
191        let addr: SocketAddr =
192            self.config
193                .bind_address
194                .parse()
195                .map_err(|e| HttpServerError::Bind {
196                    address: self.config.bind_address.clone(),
197                    source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
198                })?;
199
200        let listener = TcpListener::bind(addr)
201            .await
202            .map_err(|e| HttpServerError::Bind {
203                address: self.config.bind_address.clone(),
204                source: e,
205            })?;
206
207        #[cfg(feature = "logger")]
208        tracing::info!(address = %addr, "HTTP server listening");
209
210        // Two-phase shutdown, matching `serve_with_shutdown`.
211        // F12: flip ready -> false before notifying drain start.
212        let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
213        let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
214        let ready_for_signal = Arc::clone(&self.ready);
215        let signal = async move {
216            let _ = rx.clone().changed().await;
217            ready_for_signal.store(false, Ordering::SeqCst);
218            if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
219                let _ = tx.send(());
220            }
221        };
222
223        let future = ServerFuture {
224            inner: Box::pin(async move {
225                let serve = axum::serve(listener, app)
226                    .with_graceful_shutdown(signal)
227                    .into_future();
228                tokio::pin!(serve);
229
230                tokio::select! {
231                    result = &mut serve => result.map_err(HttpServerError::Io)?,
232                    () = async {
233                        let _ = drain_started_rx.await;
234                        tokio::time::sleep(shutdown_timeout).await;
235                    } => {
236                        #[cfg(feature = "logger")]
237                        tracing::warn!(
238                            timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
239                            "HTTP server graceful drain timed out -- forcing exit"
240                        );
241                    }
242                }
243
244                Ok(())
245            }),
246        };
247
248        Ok((handle, future))
249    }
250
251    /// Build the final router with optional health endpoints + middleware.
252    ///
253    /// Applies `tower_http::TraceLayer` (per-request `tracing` spans, default
254    /// DEBUG level so probe traffic doesn't flood at INFO) and
255    /// `tower_http::TimeoutLayer` (per-request deadline from
256    /// [`HttpServerConfig::request_timeout`]). The layers wrap both the
257    /// user-supplied router and the framework probe/admin routes.
258    fn build_router(&self, app: Router) -> Router {
259        let mut router = app;
260
261        if self.config.enable_health_endpoints {
262            let ready = Arc::clone(&self.ready);
263            // K8s-standard paths are /healthz and /readyz; the
264            // /health/live and /health/ready aliases stay for
265            // backward compat with existing consumer probes.
266            // Kaz #38: docs claim /readyz, code only had /health/ready.
267            let r1 = Arc::clone(&ready);
268            let r2 = Arc::clone(&ready);
269            router = router
270                .route("/health/live", get(health_live))
271                .route("/health/ready", get(move || health_ready(Arc::clone(&r1))))
272                .route("/healthz", get(health_live))
273                .route("/readyz", get(move || health_ready(Arc::clone(&r2))));
274        }
275
276        #[cfg(all(feature = "health", feature = "serde_json"))]
277        if self.config.enable_health_endpoints {
278            router = router.route("/health/detailed", get(health_detailed));
279        }
280
281        #[cfg(feature = "config")]
282        if self.config.enable_config_endpoint {
283            router = router.route("/config", get(config_dump));
284        }
285
286        // New default (metrics audit): http-server emits ZERO today. Add the
287        // originator scale signals (spec 5b) -- in-flight gauge + shed counter
288        // + requests counter + duration histogram -- via a thin axum middleware.
289        // Route label is OMITTED deliberately (only method + status): a clean
290        // templated route is not reachable from this `from_fn` seam, and the raw
291        // path is a cardinality bomb. The `metrics` crate is not part of the
292        // `http-server` feature, so the whole layer is `metrics`-gated.
293        #[cfg(feature = "metrics")]
294        let router = router.layer(axum::middleware::from_fn(http_server_metrics));
295
296        router
297            .layer(TraceLayer::new_for_http())
298            .layer(TimeoutLayer::with_status_code(
299                StatusCode::REQUEST_TIMEOUT,
300                self.config.request_timeout(),
301            ))
302            // Cap in-flight requests. Queue fills under load;
303            // upstream should see backpressure via send-timeout.
304            .layer(ConcurrencyLimitLayer::new(self.config.max_connections))
305    }
306}
307
308/// Axum middleware that records the built-in HTTP-server metrics.
309///
310/// - `dfe_http_server_inflight_requests` gauge: maintained directly (inc on
311///   arrival, dec on completion) -- the primary push-originator scale signal.
312/// - `dfe_http_server_requests_total{method,status}` counter: RPS by outcome.
313/// - `dfe_http_server_request_duration_seconds` histogram.
314/// - `dfe_http_server_shed_total` counter: 503s (self-regulation shedding).
315///
316/// No route label (bounded cardinality -- see `build_router`). Named `dfe_`
317/// to sit alongside the other transport-family metrics (no MetricsManager
318/// namespace is threaded into the HTTP server).
319#[cfg(feature = "metrics")]
320async fn http_server_metrics(
321    req: axum::extract::Request,
322    next: axum::middleware::Next,
323) -> axum::response::Response {
324    // RAII so the in-flight gauge is balanced even if the request future is
325    // DROPPED at the await (timeout / client disconnect / handler panic) --
326    // a bare inc/dec pair leaks the gauge upward on every such request and
327    // poisons the exact push-originator scale signal this metric exists for.
328    struct InflightGuard;
329    impl InflightGuard {
330        fn new() -> Self {
331            metrics::gauge!("dfe_http_server_inflight_requests").increment(1.0);
332            Self
333        }
334    }
335    impl Drop for InflightGuard {
336        fn drop(&mut self) {
337            metrics::gauge!("dfe_http_server_inflight_requests").decrement(1.0);
338        }
339    }
340
341    let method = req.method().as_str().to_owned();
342    let start = std::time::Instant::now();
343    let _inflight = InflightGuard::new();
344
345    // If this await is cancelled, `_inflight` drops -> the gauge is decremented;
346    // the completion counters below are correctly skipped (no completed status).
347    let response = next.run(req).await;
348
349    let status = response.status();
350    metrics::counter!(
351        "dfe_http_server_requests_total",
352        "method" => method,
353        "status" => status.as_u16().to_string()
354    )
355    .increment(1);
356    metrics::histogram!("dfe_http_server_request_duration_seconds")
357        .record(start.elapsed().as_secs_f64());
358    if status == StatusCode::SERVICE_UNAVAILABLE {
359        metrics::counter!("dfe_http_server_shed_total").increment(1);
360    }
361
362    response
363}
364
365/// Handle for triggering server shutdown.
366#[derive(Clone)]
367pub struct ShutdownHandle {
368    sender: watch::Sender<()>,
369}
370
371impl ShutdownHandle {
372    /// Trigger graceful shutdown.
373    pub fn shutdown(self) {
374        let _ = self.sender.send(());
375    }
376}
377
378/// Future representing the running server.
379pub struct ServerFuture {
380    inner: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
381}
382
383impl std::future::Future for ServerFuture {
384    type Output = Result<()>;
385
386    fn poll(
387        mut self: std::pin::Pin<&mut Self>,
388        cx: &mut std::task::Context<'_>,
389    ) -> std::task::Poll<Self::Output> {
390        self.inner.as_mut().poll(cx)
391    }
392}
393
394/// Liveness endpoint handler.
395async fn health_live() -> impl IntoResponse {
396    (StatusCode::OK, "OK")
397}
398
399/// Readiness endpoint handler.
400///
401/// Checks the local ready flag AND (when the `health` feature is enabled)
402/// the global [`HealthRegistry`](crate::health::HealthRegistry). Both must
403/// be true for a 200 response; otherwise 503.
404async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
405    let locally_ready = ready.load(Ordering::SeqCst);
406
407    #[cfg(feature = "health")]
408    let registry_ready = crate::health::HealthRegistry::is_ready();
409    #[cfg(not(feature = "health"))]
410    let registry_ready = true;
411
412    if locally_ready && registry_ready {
413        (StatusCode::OK, "OK")
414    } else {
415        (StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
416    }
417}
418
419/// Detailed health endpoint returning per-component status as JSON.
420///
421/// Returns the output of [`HealthRegistry::to_json()`](crate::health::HealthRegistry::to_json),
422/// which includes overall status and each registered component's state.
423#[cfg(all(feature = "health", feature = "serde_json"))]
424async fn health_detailed() -> impl IntoResponse {
425    let json = crate::health::HealthRegistry::to_json();
426    axum::Json(json)
427}
428
429/// Config registry dump endpoint handler (redacted).
430#[cfg(feature = "config")]
431async fn config_dump() -> impl IntoResponse {
432    let effective = crate::config::registry::dump_effective();
433    let defaults = crate::config::registry::dump_defaults();
434
435    let body = serde_json::json!({
436        "effective": effective,
437        "defaults": defaults,
438        "sections": crate::config::registry::sections()
439            .iter()
440            .map(|s| serde_json::json!({
441                "key": s.key,
442                "type": s.type_name,
443            }))
444            .collect::<Vec<_>>(),
445    });
446
447    (
448        StatusCode::OK,
449        [("content-type", "application/json")],
450        serde_json::to_string_pretty(&body).unwrap_or_default(),
451    )
452}
453
454/// Wait for a shutdown signal (SIGTERM or SIGINT).
455///
456/// Used as fallback when the `shutdown` feature is not enabled.
457#[cfg(not(feature = "shutdown"))]
458async fn shutdown_signal() {
459    let ctrl_c = async {
460        signal::ctrl_c()
461            .await
462            .expect("failed to install Ctrl+C handler");
463    };
464
465    #[cfg(unix)]
466    let terminate = async {
467        signal::unix::signal(signal::unix::SignalKind::terminate())
468            .expect("failed to install signal handler")
469            .recv()
470            .await;
471    };
472
473    #[cfg(not(unix))]
474    let terminate = std::future::pending::<()>();
475
476    tokio::select! {
477        () = ctrl_c => {},
478        () = terminate => {},
479    }
480
481    #[cfg(feature = "logger")]
482    tracing::info!("Shutdown signal received, starting graceful shutdown");
483}
484
485#[cfg(test)]
486mod tests {
487    use super::*;
488    use axum::body::Body;
489    use axum::http::Request;
490    use tower::ServiceExt;
491
492    #[tokio::test]
493    async fn test_health_live() {
494        let config = HttpServerConfig::default();
495        let server = HttpServer::new(config);
496        let app = server.build_router(Router::new());
497
498        let response = app
499            .oneshot(
500                Request::builder()
501                    .uri("/health/live")
502                    .body(Body::empty())
503                    .unwrap(),
504            )
505            .await
506            .unwrap();
507
508        assert_eq!(response.status(), StatusCode::OK);
509    }
510
511    #[tokio::test]
512    async fn test_health_ready_when_ready() {
513        #[cfg(feature = "health")]
514        crate::health::HealthRegistry::reset();
515
516        let config = HttpServerConfig::default();
517        let server = HttpServer::new(config);
518        server.set_ready(true);
519        let app = server.build_router(Router::new());
520
521        let response = app
522            .oneshot(
523                Request::builder()
524                    .uri("/health/ready")
525                    .body(Body::empty())
526                    .unwrap(),
527            )
528            .await
529            .unwrap();
530
531        assert_eq!(response.status(), StatusCode::OK);
532    }
533
534    #[tokio::test]
535    async fn test_health_ready_when_not_ready() {
536        let config = HttpServerConfig::default();
537        let server = HttpServer::new(config);
538        server.set_ready(false);
539        let app = server.build_router(Router::new());
540
541        let response = app
542            .oneshot(
543                Request::builder()
544                    .uri("/health/ready")
545                    .body(Body::empty())
546                    .unwrap(),
547            )
548            .await
549            .unwrap();
550
551        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
552    }
553
554    #[tokio::test]
555    async fn test_server_with_handle() {
556        // Test the handle API works with an actual server
557        let config = HttpServerConfig::new("127.0.0.1:18080");
558        let server = HttpServer::new(config);
559
560        let app = Router::new().route("/", get(|| async { "Hello" }));
561
562        // Test the handle API compiles and works
563        let (handle, future) = server.serve_with_handle(app).await.unwrap();
564
565        // Shutdown immediately
566        handle.shutdown();
567
568        // Wait for server to finish
569        future.await.unwrap();
570    }
571
572    /// Kaz #38: K8s-standard `/healthz` and `/readyz` paths must
573    /// be live alongside the legacy `/health/live` and
574    /// `/health/ready` aliases.
575    #[tokio::test]
576    async fn k8s_standard_health_paths_are_mounted() {
577        let config = HttpServerConfig::default();
578        let server = HttpServer::new(config);
579        let app = server.build_router(Router::new());
580
581        for path in &["/healthz", "/readyz"] {
582            let response = app
583                .clone()
584                .oneshot(Request::builder().uri(*path).body(Body::empty()).unwrap())
585                .await
586                .unwrap();
587            assert_eq!(response.status(), StatusCode::OK, "path={path}");
588        }
589    }
590
591    /// Regression: shutdown signal flips the readiness
592    /// flag so /health/ready returns 503 before the drain window
593    /// completes. K8s endpoint controller stops routing on the 503.
594    #[tokio::test]
595    async fn shutdown_signal_flips_ready_before_drain() {
596        let config = HttpServerConfig::new("127.0.0.1:18081");
597        let server = HttpServer::new(config);
598        let ready = server.ready_flag();
599        assert!(ready.load(Ordering::SeqCst), "ready starts true");
600
601        let app = Router::new().route(
602            "/slow",
603            get(|| async {
604                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
605                "done"
606            }),
607        );
608
609        let (handle, future) = server.serve_with_handle(app).await.unwrap();
610        let server_task = tokio::spawn(future);
611
612        // Trigger shutdown -- handle.shutdown() drops the watch sender.
613        handle.shutdown();
614
615        // Allow the signal future to observe + flip readiness.
616        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
617        assert!(
618            !ready.load(Ordering::SeqCst),
619            "ready must flip to false post-shutdown",
620        );
621
622        let _ = server_task.await;
623    }
624
625    #[test]
626    fn test_ready_flag() {
627        let config = HttpServerConfig::default();
628        let server = HttpServer::new(config);
629
630        assert!(server.is_ready());
631        server.set_ready(false);
632        assert!(!server.is_ready());
633        server.set_ready(true);
634        assert!(server.is_ready());
635    }
636}