Skip to main content

hyperi_rustlib/http_server/
server.rs

1// Project:   hyperi-rustlib
2// File:      src/http_server/server.rs
3// Purpose:   HTTP server implementation
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! HTTP server implementation using axum.
10
11use crate::http_server::{HttpServerConfig, HttpServerError, Result};
12use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::net::TcpListener;
18#[cfg(not(feature = "shutdown"))]
19use tokio::signal;
20use tokio::sync::watch;
21use tower::limit::ConcurrencyLimitLayer;
22use tower_http::timeout::TimeoutLayer;
23use tower_http::trace::TraceLayer;
24
25/// High-performance HTTP server built on axum.
26///
27/// Provides graceful shutdown, health endpoints, and Tower middleware support.
28pub struct HttpServer {
29    config: HttpServerConfig,
30    ready: Arc<AtomicBool>,
31}
32
33impl HttpServer {
34    /// Create a new HTTP server with the given configuration.
35    #[must_use]
36    pub fn new(config: HttpServerConfig) -> Self {
37        let ready = Arc::new(AtomicBool::new(true));
38
39        #[cfg(feature = "health")]
40        {
41            let r = Arc::clone(&ready);
42            crate::health::HealthRegistry::register("http_server", move || {
43                if r.load(Ordering::Relaxed) {
44                    crate::health::HealthStatus::Healthy
45                } else {
46                    crate::health::HealthStatus::Unhealthy
47                }
48            });
49        }
50
51        Self { config, ready }
52    }
53
54    /// Create a new HTTP server bound to the specified address.
55    #[must_use]
56    pub fn bind(address: impl Into<String>) -> Self {
57        Self::new(HttpServerConfig::new(address))
58    }
59
60    /// Set the readiness state for the /health/ready endpoint.
61    pub fn set_ready(&self, ready: bool) {
62        self.ready.store(ready, Ordering::SeqCst);
63    }
64
65    /// Get the current readiness state.
66    #[must_use]
67    pub fn is_ready(&self) -> bool {
68        self.ready.load(Ordering::SeqCst)
69    }
70
71    /// Get a clone of the readiness flag for use in application state.
72    #[must_use]
73    pub fn ready_flag(&self) -> Arc<AtomicBool> {
74        Arc::clone(&self.ready)
75    }
76
77    /// Serve the given router until a shutdown signal is received.
78    ///
79    /// This method will:
80    /// 1. Bind to the configured address
81    /// 2. Optionally add health check endpoints
82    /// 3. Run until SIGTERM or SIGINT is received
83    /// 4. Perform graceful shutdown
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if binding fails or the server encounters an error.
88    pub async fn serve(self, app: Router) -> Result<()> {
89        #[cfg(feature = "shutdown")]
90        {
91            let token = crate::shutdown::install_signal_handler();
92            self.serve_with_shutdown(app, token.cancelled_owned()).await
93        }
94        #[cfg(not(feature = "shutdown"))]
95        {
96            self.serve_with_shutdown(app, shutdown_signal()).await
97        }
98    }
99
100    /// Serve with a custom shutdown signal.
101    ///
102    /// This is useful for testing or when you need custom shutdown logic.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if binding fails or the server encounters an error.
107    pub async fn serve_with_shutdown<F>(self, app: Router, shutdown: F) -> Result<()>
108    where
109        F: std::future::Future<Output = ()> + Send + 'static,
110    {
111        // Enforce config validity before binding (Codex review 2026-06-03): a
112        // config requesting unsupported in-process TLS must fail loudly here,
113        // not bind cleartext while is_tls_enabled() reports true.
114        self.config.validate().map_err(HttpServerError::TlsConfig)?;
115        let shutdown_timeout = self.config.shutdown_timeout();
116        let app = self.build_router(app);
117
118        let addr: SocketAddr =
119            self.config
120                .bind_address
121                .parse()
122                .map_err(|e| HttpServerError::Bind {
123                    address: self.config.bind_address.clone(),
124                    source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
125                })?;
126
127        let listener = TcpListener::bind(addr)
128            .await
129            .map_err(|e| HttpServerError::Bind {
130                address: self.config.bind_address.clone(),
131                source: e,
132            })?;
133
134        #[cfg(feature = "logger")]
135        tracing::info!(address = %addr, "HTTP server listening");
136
137        // Two-phase: unbounded wait for shutdown, then bounded drain.
138        // If drain exceeds shutdown_timeout, drop the serve future
139        // so K8s terminationGracePeriodSeconds isn't blown.
140        //
141        // F12: flip ready -> false BEFORE notifying drain start so
142        // /health/ready returns 503 the moment shutdown is signalled.
143        // K8s endpoint controller catches the 503 and stops routing
144        // before in-flight requests finish draining.
145        let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
146        let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
147        let ready_for_signal = Arc::clone(&self.ready);
148        let signal = async move {
149            shutdown.await;
150            ready_for_signal.store(false, Ordering::SeqCst);
151            if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
152                let _ = tx.send(());
153            }
154        };
155        // WithGracefulShutdown is IntoFuture, not Future.
156        let serve = axum::serve(listener, app)
157            .with_graceful_shutdown(signal)
158            .into_future();
159        tokio::pin!(serve);
160
161        tokio::select! {
162            result = &mut serve => result.map_err(HttpServerError::Io)?,
163            () = async {
164                let _ = drain_started_rx.await;
165                tokio::time::sleep(shutdown_timeout).await;
166            } => {
167                #[cfg(feature = "logger")]
168                tracing::warn!(
169                    timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
170                    "HTTP server graceful drain timed out -- forcing exit"
171                );
172            }
173        }
174
175        #[cfg(feature = "logger")]
176        tracing::info!("HTTP server shut down gracefully");
177
178        Ok(())
179    }
180
181    /// Serve and return a handle for programmatic shutdown.
182    ///
183    /// Returns a `ShutdownHandle` that can be used to trigger shutdown.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if binding fails.
188    pub async fn serve_with_handle(self, app: Router) -> Result<(ShutdownHandle, ServerFuture)> {
189        // Same validation gate as serve_with_shutdown (Codex review 2026-06-03).
190        self.config.validate().map_err(HttpServerError::TlsConfig)?;
191        let (tx, rx) = watch::channel(());
192        let handle = ShutdownHandle { sender: tx };
193
194        let shutdown_timeout = self.config.shutdown_timeout();
195        let app = self.build_router(app);
196
197        let addr: SocketAddr =
198            self.config
199                .bind_address
200                .parse()
201                .map_err(|e| HttpServerError::Bind {
202                    address: self.config.bind_address.clone(),
203                    source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
204                })?;
205
206        let listener = TcpListener::bind(addr)
207            .await
208            .map_err(|e| HttpServerError::Bind {
209                address: self.config.bind_address.clone(),
210                source: e,
211            })?;
212
213        #[cfg(feature = "logger")]
214        tracing::info!(address = %addr, "HTTP server listening");
215
216        // Two-phase shutdown, matching `serve_with_shutdown`.
217        // F12: flip ready -> false before notifying drain start.
218        let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
219        let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
220        let ready_for_signal = Arc::clone(&self.ready);
221        let signal = async move {
222            let _ = rx.clone().changed().await;
223            ready_for_signal.store(false, Ordering::SeqCst);
224            if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
225                let _ = tx.send(());
226            }
227        };
228
229        let future = ServerFuture {
230            inner: Box::pin(async move {
231                let serve = axum::serve(listener, app)
232                    .with_graceful_shutdown(signal)
233                    .into_future();
234                tokio::pin!(serve);
235
236                tokio::select! {
237                    result = &mut serve => result.map_err(HttpServerError::Io)?,
238                    () = async {
239                        let _ = drain_started_rx.await;
240                        tokio::time::sleep(shutdown_timeout).await;
241                    } => {
242                        #[cfg(feature = "logger")]
243                        tracing::warn!(
244                            timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
245                            "HTTP server graceful drain timed out -- forcing exit"
246                        );
247                    }
248                }
249
250                Ok(())
251            }),
252        };
253
254        Ok((handle, future))
255    }
256
257    /// Build the final router with optional health endpoints + middleware.
258    ///
259    /// Applies `tower_http::TraceLayer` (per-request `tracing` spans, default
260    /// DEBUG level so probe traffic doesn't flood at INFO) and
261    /// `tower_http::TimeoutLayer` (per-request deadline from
262    /// [`HttpServerConfig::request_timeout`]). The layers wrap both the
263    /// user-supplied router and the framework probe/admin routes.
264    fn build_router(&self, app: Router) -> Router {
265        let mut router = app;
266
267        if self.config.enable_health_endpoints {
268            let ready = Arc::clone(&self.ready);
269            // K8s-standard paths are /healthz and /readyz; the
270            // /health/live and /health/ready aliases stay for
271            // backward compat with existing consumer probes.
272            // Kaz #38: docs claim /readyz, code only had /health/ready.
273            let r1 = Arc::clone(&ready);
274            let r2 = Arc::clone(&ready);
275            router = router
276                .route("/health/live", get(health_live))
277                .route("/health/ready", get(move || health_ready(Arc::clone(&r1))))
278                .route("/healthz", get(health_live))
279                .route("/readyz", get(move || health_ready(Arc::clone(&r2))));
280        }
281
282        #[cfg(all(feature = "health", feature = "serde_json"))]
283        if self.config.enable_health_endpoints {
284            router = router.route("/health/detailed", get(health_detailed));
285        }
286
287        #[cfg(feature = "config")]
288        if self.config.enable_config_endpoint {
289            router = router.route("/config", get(config_dump));
290        }
291
292        router
293            .layer(TraceLayer::new_for_http())
294            .layer(TimeoutLayer::with_status_code(
295                StatusCode::REQUEST_TIMEOUT,
296                self.config.request_timeout(),
297            ))
298            // Cap in-flight requests. Queue fills under load;
299            // upstream should see backpressure via send-timeout.
300            .layer(ConcurrencyLimitLayer::new(self.config.max_connections))
301    }
302}
303
304/// Handle for triggering server shutdown.
305#[derive(Clone)]
306pub struct ShutdownHandle {
307    sender: watch::Sender<()>,
308}
309
310impl ShutdownHandle {
311    /// Trigger graceful shutdown.
312    pub fn shutdown(self) {
313        let _ = self.sender.send(());
314    }
315}
316
317/// Future representing the running server.
318pub struct ServerFuture {
319    inner: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
320}
321
322impl std::future::Future for ServerFuture {
323    type Output = Result<()>;
324
325    fn poll(
326        mut self: std::pin::Pin<&mut Self>,
327        cx: &mut std::task::Context<'_>,
328    ) -> std::task::Poll<Self::Output> {
329        self.inner.as_mut().poll(cx)
330    }
331}
332
333/// Liveness endpoint handler.
334async fn health_live() -> impl IntoResponse {
335    (StatusCode::OK, "OK")
336}
337
338/// Readiness endpoint handler.
339///
340/// Checks the local ready flag AND (when the `health` feature is enabled)
341/// the global [`HealthRegistry`](crate::health::HealthRegistry). Both must
342/// be true for a 200 response; otherwise 503.
343async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
344    let locally_ready = ready.load(Ordering::SeqCst);
345
346    #[cfg(feature = "health")]
347    let registry_ready = crate::health::HealthRegistry::is_ready();
348    #[cfg(not(feature = "health"))]
349    let registry_ready = true;
350
351    if locally_ready && registry_ready {
352        (StatusCode::OK, "OK")
353    } else {
354        (StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
355    }
356}
357
358/// Detailed health endpoint returning per-component status as JSON.
359///
360/// Returns the output of [`HealthRegistry::to_json()`](crate::health::HealthRegistry::to_json),
361/// which includes overall status and each registered component's state.
362#[cfg(all(feature = "health", feature = "serde_json"))]
363async fn health_detailed() -> impl IntoResponse {
364    let json = crate::health::HealthRegistry::to_json();
365    axum::Json(json)
366}
367
368/// Config registry dump endpoint handler (redacted).
369#[cfg(feature = "config")]
370async fn config_dump() -> impl IntoResponse {
371    let effective = crate::config::registry::dump_effective();
372    let defaults = crate::config::registry::dump_defaults();
373
374    let body = serde_json::json!({
375        "effective": effective,
376        "defaults": defaults,
377        "sections": crate::config::registry::sections()
378            .iter()
379            .map(|s| serde_json::json!({
380                "key": s.key,
381                "type": s.type_name,
382            }))
383            .collect::<Vec<_>>(),
384    });
385
386    (
387        StatusCode::OK,
388        [("content-type", "application/json")],
389        serde_json::to_string_pretty(&body).unwrap_or_default(),
390    )
391}
392
393/// Wait for a shutdown signal (SIGTERM or SIGINT).
394///
395/// Used as fallback when the `shutdown` feature is not enabled.
396#[cfg(not(feature = "shutdown"))]
397async fn shutdown_signal() {
398    let ctrl_c = async {
399        signal::ctrl_c()
400            .await
401            .expect("failed to install Ctrl+C handler");
402    };
403
404    #[cfg(unix)]
405    let terminate = async {
406        signal::unix::signal(signal::unix::SignalKind::terminate())
407            .expect("failed to install signal handler")
408            .recv()
409            .await;
410    };
411
412    #[cfg(not(unix))]
413    let terminate = std::future::pending::<()>();
414
415    tokio::select! {
416        () = ctrl_c => {},
417        () = terminate => {},
418    }
419
420    #[cfg(feature = "logger")]
421    tracing::info!("Shutdown signal received, starting graceful shutdown");
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use axum::body::Body;
428    use axum::http::Request;
429    use tower::ServiceExt;
430
431    #[tokio::test]
432    async fn test_health_live() {
433        let config = HttpServerConfig::default();
434        let server = HttpServer::new(config);
435        let app = server.build_router(Router::new());
436
437        let response = app
438            .oneshot(
439                Request::builder()
440                    .uri("/health/live")
441                    .body(Body::empty())
442                    .unwrap(),
443            )
444            .await
445            .unwrap();
446
447        assert_eq!(response.status(), StatusCode::OK);
448    }
449
450    #[tokio::test]
451    async fn test_health_ready_when_ready() {
452        #[cfg(feature = "health")]
453        crate::health::HealthRegistry::reset();
454
455        let config = HttpServerConfig::default();
456        let server = HttpServer::new(config);
457        server.set_ready(true);
458        let app = server.build_router(Router::new());
459
460        let response = app
461            .oneshot(
462                Request::builder()
463                    .uri("/health/ready")
464                    .body(Body::empty())
465                    .unwrap(),
466            )
467            .await
468            .unwrap();
469
470        assert_eq!(response.status(), StatusCode::OK);
471    }
472
473    #[tokio::test]
474    async fn test_health_ready_when_not_ready() {
475        let config = HttpServerConfig::default();
476        let server = HttpServer::new(config);
477        server.set_ready(false);
478        let app = server.build_router(Router::new());
479
480        let response = app
481            .oneshot(
482                Request::builder()
483                    .uri("/health/ready")
484                    .body(Body::empty())
485                    .unwrap(),
486            )
487            .await
488            .unwrap();
489
490        assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
491    }
492
493    #[tokio::test]
494    async fn test_server_with_handle() {
495        // Test the handle API works with an actual server
496        let config = HttpServerConfig::new("127.0.0.1:18080");
497        let server = HttpServer::new(config);
498
499        let app = Router::new().route("/", get(|| async { "Hello" }));
500
501        // Test the handle API compiles and works
502        let (handle, future) = server.serve_with_handle(app).await.unwrap();
503
504        // Shutdown immediately
505        handle.shutdown();
506
507        // Wait for server to finish
508        future.await.unwrap();
509    }
510
511    /// Kaz #38: K8s-standard `/healthz` and `/readyz` paths must
512    /// be live alongside the legacy `/health/live` and
513    /// `/health/ready` aliases.
514    #[tokio::test]
515    async fn k8s_standard_health_paths_are_mounted() {
516        let config = HttpServerConfig::default();
517        let server = HttpServer::new(config);
518        let app = server.build_router(Router::new());
519
520        for path in &["/healthz", "/readyz"] {
521            let response = app
522                .clone()
523                .oneshot(Request::builder().uri(*path).body(Body::empty()).unwrap())
524                .await
525                .unwrap();
526            assert_eq!(response.status(), StatusCode::OK, "path={path}");
527        }
528    }
529
530    /// Codex F12 regression: shutdown signal flips the readiness
531    /// flag so /health/ready returns 503 before the drain window
532    /// completes. K8s endpoint controller stops routing on the 503.
533    #[tokio::test]
534    async fn shutdown_signal_flips_ready_before_drain() {
535        let config = HttpServerConfig::new("127.0.0.1:18081");
536        let server = HttpServer::new(config);
537        let ready = server.ready_flag();
538        assert!(ready.load(Ordering::SeqCst), "ready starts true");
539
540        let app = Router::new().route(
541            "/slow",
542            get(|| async {
543                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
544                "done"
545            }),
546        );
547
548        let (handle, future) = server.serve_with_handle(app).await.unwrap();
549        let server_task = tokio::spawn(future);
550
551        // Trigger shutdown -- handle.shutdown() drops the watch sender.
552        handle.shutdown();
553
554        // Allow the signal future to observe + flip readiness.
555        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
556        assert!(
557            !ready.load(Ordering::SeqCst),
558            "ready must flip to false post-shutdown",
559        );
560
561        let _ = server_task.await;
562    }
563
564    #[test]
565    fn test_ready_flag() {
566        let config = HttpServerConfig::default();
567        let server = HttpServer::new(config);
568
569        assert!(server.is_ready());
570        server.set_ready(false);
571        assert!(!server.is_ready());
572        server.set_ready(true);
573        assert!(server.is_ready());
574    }
575}