hyperi_rustlib/http_server/
server.rs1use crate::http_server::{HttpServerConfig, HttpServerError, Result};
12use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::net::TcpListener;
18#[cfg(not(feature = "shutdown"))]
19use tokio::signal;
20use tokio::sync::watch;
21use tower::limit::ConcurrencyLimitLayer;
22use tower_http::timeout::TimeoutLayer;
23use tower_http::trace::TraceLayer;
24
25pub struct HttpServer {
29 config: HttpServerConfig,
30 ready: Arc<AtomicBool>,
31}
32
33impl HttpServer {
34 #[must_use]
36 pub fn new(config: HttpServerConfig) -> Self {
37 let ready = Arc::new(AtomicBool::new(true));
38
39 #[cfg(feature = "health")]
40 {
41 let r = Arc::clone(&ready);
42 crate::health::HealthRegistry::register("http_server", move || {
43 if r.load(Ordering::Relaxed) {
44 crate::health::HealthStatus::Healthy
45 } else {
46 crate::health::HealthStatus::Unhealthy
47 }
48 });
49 }
50
51 Self { config, ready }
52 }
53
54 #[must_use]
56 pub fn bind(address: impl Into<String>) -> Self {
57 Self::new(HttpServerConfig::new(address))
58 }
59
60 pub fn set_ready(&self, ready: bool) {
62 self.ready.store(ready, Ordering::SeqCst);
63 }
64
65 #[must_use]
67 pub fn is_ready(&self) -> bool {
68 self.ready.load(Ordering::SeqCst)
69 }
70
71 #[must_use]
73 pub fn ready_flag(&self) -> Arc<AtomicBool> {
74 Arc::clone(&self.ready)
75 }
76
77 pub async fn serve(self, app: Router) -> Result<()> {
83 #[cfg(feature = "shutdown")]
84 {
85 let token = crate::shutdown::install_signal_handler();
86 self.serve_with_shutdown(app, token.cancelled_owned()).await
87 }
88 #[cfg(not(feature = "shutdown"))]
89 {
90 self.serve_with_shutdown(app, shutdown_signal()).await
91 }
92 }
93
94 pub async fn serve_with_shutdown<F>(self, app: Router, shutdown: F) -> Result<()>
102 where
103 F: std::future::Future<Output = ()> + Send + 'static,
104 {
105 self.config.validate().map_err(HttpServerError::TlsConfig)?;
109 let shutdown_timeout = self.config.shutdown_timeout();
110 let app = self.build_router(app);
111
112 let addr: SocketAddr =
113 self.config
114 .bind_address
115 .parse()
116 .map_err(|e| HttpServerError::Bind {
117 address: self.config.bind_address.clone(),
118 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
119 })?;
120
121 let listener = TcpListener::bind(addr)
122 .await
123 .map_err(|e| HttpServerError::Bind {
124 address: self.config.bind_address.clone(),
125 source: e,
126 })?;
127
128 #[cfg(feature = "logger")]
129 tracing::info!(address = %addr, "HTTP server listening");
130
131 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
140 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
141 let ready_for_signal = Arc::clone(&self.ready);
142 let signal = async move {
143 shutdown.await;
144 ready_for_signal.store(false, Ordering::SeqCst);
145 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
146 let _ = tx.send(());
147 }
148 };
149 let serve = axum::serve(listener, app)
151 .with_graceful_shutdown(signal)
152 .into_future();
153 tokio::pin!(serve);
154
155 tokio::select! {
156 result = &mut serve => result.map_err(HttpServerError::Io)?,
157 () = async {
158 let _ = drain_started_rx.await;
159 tokio::time::sleep(shutdown_timeout).await;
160 } => {
161 #[cfg(feature = "logger")]
162 tracing::warn!(
163 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
164 "HTTP server graceful drain timed out -- forcing exit"
165 );
166 }
167 }
168
169 #[cfg(feature = "logger")]
170 tracing::info!("HTTP server shut down gracefully");
171
172 Ok(())
173 }
174
175 pub async fn serve_with_handle(self, app: Router) -> Result<(ShutdownHandle, ServerFuture)> {
183 self.config.validate().map_err(HttpServerError::TlsConfig)?;
185 let (tx, rx) = watch::channel(());
186 let handle = ShutdownHandle { sender: tx };
187
188 let shutdown_timeout = self.config.shutdown_timeout();
189 let app = self.build_router(app);
190
191 let addr: SocketAddr =
192 self.config
193 .bind_address
194 .parse()
195 .map_err(|e| HttpServerError::Bind {
196 address: self.config.bind_address.clone(),
197 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
198 })?;
199
200 let listener = TcpListener::bind(addr)
201 .await
202 .map_err(|e| HttpServerError::Bind {
203 address: self.config.bind_address.clone(),
204 source: e,
205 })?;
206
207 #[cfg(feature = "logger")]
208 tracing::info!(address = %addr, "HTTP server listening");
209
210 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
213 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
214 let ready_for_signal = Arc::clone(&self.ready);
215 let signal = async move {
216 let _ = rx.clone().changed().await;
217 ready_for_signal.store(false, Ordering::SeqCst);
218 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
219 let _ = tx.send(());
220 }
221 };
222
223 let future = ServerFuture {
224 inner: Box::pin(async move {
225 let serve = axum::serve(listener, app)
226 .with_graceful_shutdown(signal)
227 .into_future();
228 tokio::pin!(serve);
229
230 tokio::select! {
231 result = &mut serve => result.map_err(HttpServerError::Io)?,
232 () = async {
233 let _ = drain_started_rx.await;
234 tokio::time::sleep(shutdown_timeout).await;
235 } => {
236 #[cfg(feature = "logger")]
237 tracing::warn!(
238 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
239 "HTTP server graceful drain timed out -- forcing exit"
240 );
241 }
242 }
243
244 Ok(())
245 }),
246 };
247
248 Ok((handle, future))
249 }
250
251 fn build_router(&self, app: Router) -> Router {
259 let mut router = app;
260
261 if self.config.enable_health_endpoints {
262 let ready = Arc::clone(&self.ready);
263 let r1 = Arc::clone(&ready);
268 let r2 = Arc::clone(&ready);
269 router = router
270 .route("/health/live", get(health_live))
271 .route("/health/ready", get(move || health_ready(Arc::clone(&r1))))
272 .route("/healthz", get(health_live))
273 .route("/readyz", get(move || health_ready(Arc::clone(&r2))));
274 }
275
276 #[cfg(all(feature = "health", feature = "serde_json"))]
277 if self.config.enable_health_endpoints {
278 router = router.route("/health/detailed", get(health_detailed));
279 }
280
281 #[cfg(feature = "config")]
282 if self.config.enable_config_endpoint {
283 router = router.route("/config", get(config_dump));
284 }
285
286 #[cfg(feature = "metrics")]
294 let router = router.layer(axum::middleware::from_fn(http_server_metrics));
295
296 router
297 .layer(TraceLayer::new_for_http())
298 .layer(TimeoutLayer::with_status_code(
299 StatusCode::REQUEST_TIMEOUT,
300 self.config.request_timeout(),
301 ))
302 .layer(ConcurrencyLimitLayer::new(self.config.max_connections))
305 }
306}
307
308#[cfg(feature = "metrics")]
320async fn http_server_metrics(
321 req: axum::extract::Request,
322 next: axum::middleware::Next,
323) -> axum::response::Response {
324 struct InflightGuard;
329 impl InflightGuard {
330 fn new() -> Self {
331 metrics::gauge!("dfe_http_server_inflight_requests").increment(1.0);
332 Self
333 }
334 }
335 impl Drop for InflightGuard {
336 fn drop(&mut self) {
337 metrics::gauge!("dfe_http_server_inflight_requests").decrement(1.0);
338 }
339 }
340
341 let method = req.method().as_str().to_owned();
342 let start = std::time::Instant::now();
343 let _inflight = InflightGuard::new();
344
345 let response = next.run(req).await;
348
349 let status = response.status();
350 metrics::counter!(
351 "dfe_http_server_requests_total",
352 "method" => method,
353 "status" => status.as_u16().to_string()
354 )
355 .increment(1);
356 metrics::histogram!("dfe_http_server_request_duration_seconds")
357 .record(start.elapsed().as_secs_f64());
358 if status == StatusCode::SERVICE_UNAVAILABLE {
359 metrics::counter!("dfe_http_server_shed_total").increment(1);
360 }
361
362 response
363}
364
365#[derive(Clone)]
367pub struct ShutdownHandle {
368 sender: watch::Sender<()>,
369}
370
371impl ShutdownHandle {
372 pub fn shutdown(self) {
374 let _ = self.sender.send(());
375 }
376}
377
378pub struct ServerFuture {
380 inner: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
381}
382
383impl std::future::Future for ServerFuture {
384 type Output = Result<()>;
385
386 fn poll(
387 mut self: std::pin::Pin<&mut Self>,
388 cx: &mut std::task::Context<'_>,
389 ) -> std::task::Poll<Self::Output> {
390 self.inner.as_mut().poll(cx)
391 }
392}
393
394async fn health_live() -> impl IntoResponse {
396 (StatusCode::OK, "OK")
397}
398
399async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
405 let locally_ready = ready.load(Ordering::SeqCst);
406
407 #[cfg(feature = "health")]
408 let registry_ready = crate::health::HealthRegistry::is_ready();
409 #[cfg(not(feature = "health"))]
410 let registry_ready = true;
411
412 if locally_ready && registry_ready {
413 (StatusCode::OK, "OK")
414 } else {
415 (StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
416 }
417}
418
419#[cfg(all(feature = "health", feature = "serde_json"))]
424async fn health_detailed() -> impl IntoResponse {
425 let json = crate::health::HealthRegistry::to_json();
426 axum::Json(json)
427}
428
429#[cfg(feature = "config")]
431async fn config_dump() -> impl IntoResponse {
432 let effective = crate::config::registry::dump_effective();
433 let defaults = crate::config::registry::dump_defaults();
434
435 let body = serde_json::json!({
436 "effective": effective,
437 "defaults": defaults,
438 "sections": crate::config::registry::sections()
439 .iter()
440 .map(|s| serde_json::json!({
441 "key": s.key,
442 "type": s.type_name,
443 }))
444 .collect::<Vec<_>>(),
445 });
446
447 (
448 StatusCode::OK,
449 [("content-type", "application/json")],
450 serde_json::to_string_pretty(&body).unwrap_or_default(),
451 )
452}
453
454#[cfg(not(feature = "shutdown"))]
458async fn shutdown_signal() {
459 let ctrl_c = async {
460 signal::ctrl_c()
461 .await
462 .expect("failed to install Ctrl+C handler");
463 };
464
465 #[cfg(unix)]
466 let terminate = async {
467 signal::unix::signal(signal::unix::SignalKind::terminate())
468 .expect("failed to install signal handler")
469 .recv()
470 .await;
471 };
472
473 #[cfg(not(unix))]
474 let terminate = std::future::pending::<()>();
475
476 tokio::select! {
477 () = ctrl_c => {},
478 () = terminate => {},
479 }
480
481 #[cfg(feature = "logger")]
482 tracing::info!("Shutdown signal received, starting graceful shutdown");
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use axum::body::Body;
489 use axum::http::Request;
490 use tower::ServiceExt;
491
492 #[tokio::test]
493 async fn test_health_live() {
494 let config = HttpServerConfig::default();
495 let server = HttpServer::new(config);
496 let app = server.build_router(Router::new());
497
498 let response = app
499 .oneshot(
500 Request::builder()
501 .uri("/health/live")
502 .body(Body::empty())
503 .unwrap(),
504 )
505 .await
506 .unwrap();
507
508 assert_eq!(response.status(), StatusCode::OK);
509 }
510
511 #[tokio::test]
512 async fn test_health_ready_when_ready() {
513 #[cfg(feature = "health")]
514 crate::health::HealthRegistry::reset();
515
516 let config = HttpServerConfig::default();
517 let server = HttpServer::new(config);
518 server.set_ready(true);
519 let app = server.build_router(Router::new());
520
521 let response = app
522 .oneshot(
523 Request::builder()
524 .uri("/health/ready")
525 .body(Body::empty())
526 .unwrap(),
527 )
528 .await
529 .unwrap();
530
531 assert_eq!(response.status(), StatusCode::OK);
532 }
533
534 #[tokio::test]
535 async fn test_health_ready_when_not_ready() {
536 let config = HttpServerConfig::default();
537 let server = HttpServer::new(config);
538 server.set_ready(false);
539 let app = server.build_router(Router::new());
540
541 let response = app
542 .oneshot(
543 Request::builder()
544 .uri("/health/ready")
545 .body(Body::empty())
546 .unwrap(),
547 )
548 .await
549 .unwrap();
550
551 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
552 }
553
554 #[tokio::test]
555 async fn test_server_with_handle() {
556 let config = HttpServerConfig::new("127.0.0.1:18080");
558 let server = HttpServer::new(config);
559
560 let app = Router::new().route("/", get(|| async { "Hello" }));
561
562 let (handle, future) = server.serve_with_handle(app).await.unwrap();
564
565 handle.shutdown();
567
568 future.await.unwrap();
570 }
571
572 #[tokio::test]
576 async fn k8s_standard_health_paths_are_mounted() {
577 let config = HttpServerConfig::default();
578 let server = HttpServer::new(config);
579 let app = server.build_router(Router::new());
580
581 for path in &["/healthz", "/readyz"] {
582 let response = app
583 .clone()
584 .oneshot(Request::builder().uri(*path).body(Body::empty()).unwrap())
585 .await
586 .unwrap();
587 assert_eq!(response.status(), StatusCode::OK, "path={path}");
588 }
589 }
590
591 #[tokio::test]
595 async fn shutdown_signal_flips_ready_before_drain() {
596 let config = HttpServerConfig::new("127.0.0.1:18081");
597 let server = HttpServer::new(config);
598 let ready = server.ready_flag();
599 assert!(ready.load(Ordering::SeqCst), "ready starts true");
600
601 let app = Router::new().route(
602 "/slow",
603 get(|| async {
604 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
605 "done"
606 }),
607 );
608
609 let (handle, future) = server.serve_with_handle(app).await.unwrap();
610 let server_task = tokio::spawn(future);
611
612 handle.shutdown();
614
615 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
617 assert!(
618 !ready.load(Ordering::SeqCst),
619 "ready must flip to false post-shutdown",
620 );
621
622 let _ = server_task.await;
623 }
624
625 #[test]
626 fn test_ready_flag() {
627 let config = HttpServerConfig::default();
628 let server = HttpServer::new(config);
629
630 assert!(server.is_ready());
631 server.set_ready(false);
632 assert!(!server.is_ready());
633 server.set_ready(true);
634 assert!(server.is_ready());
635 }
636}