hyperi_rustlib/http_server/
server.rs1use crate::http_server::{HttpServerConfig, HttpServerError, Result};
12use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::net::TcpListener;
18#[cfg(not(feature = "shutdown"))]
19use tokio::signal;
20use tokio::sync::watch;
21use tower::limit::ConcurrencyLimitLayer;
22use tower_http::timeout::TimeoutLayer;
23use tower_http::trace::TraceLayer;
24
25pub struct HttpServer {
29 config: HttpServerConfig,
30 ready: Arc<AtomicBool>,
31}
32
33impl HttpServer {
34 #[must_use]
36 pub fn new(config: HttpServerConfig) -> Self {
37 let ready = Arc::new(AtomicBool::new(true));
38
39 #[cfg(feature = "health")]
40 {
41 let r = Arc::clone(&ready);
42 crate::health::HealthRegistry::register("http_server", move || {
43 if r.load(Ordering::Relaxed) {
44 crate::health::HealthStatus::Healthy
45 } else {
46 crate::health::HealthStatus::Unhealthy
47 }
48 });
49 }
50
51 Self { config, ready }
52 }
53
54 #[must_use]
56 pub fn bind(address: impl Into<String>) -> Self {
57 Self::new(HttpServerConfig::new(address))
58 }
59
60 pub fn set_ready(&self, ready: bool) {
62 self.ready.store(ready, Ordering::SeqCst);
63 }
64
65 #[must_use]
67 pub fn is_ready(&self) -> bool {
68 self.ready.load(Ordering::SeqCst)
69 }
70
71 #[must_use]
73 pub fn ready_flag(&self) -> Arc<AtomicBool> {
74 Arc::clone(&self.ready)
75 }
76
77 pub async fn serve(self, app: Router) -> Result<()> {
89 #[cfg(feature = "shutdown")]
90 {
91 let token = crate::shutdown::install_signal_handler();
92 self.serve_with_shutdown(app, token.cancelled_owned()).await
93 }
94 #[cfg(not(feature = "shutdown"))]
95 {
96 self.serve_with_shutdown(app, shutdown_signal()).await
97 }
98 }
99
100 pub async fn serve_with_shutdown<F>(self, app: Router, shutdown: F) -> Result<()>
108 where
109 F: std::future::Future<Output = ()> + Send + 'static,
110 {
111 let shutdown_timeout = self.config.shutdown_timeout();
112 let app = self.build_router(app);
113
114 let addr: SocketAddr =
115 self.config
116 .bind_address
117 .parse()
118 .map_err(|e| HttpServerError::Bind {
119 address: self.config.bind_address.clone(),
120 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
121 })?;
122
123 let listener = TcpListener::bind(addr)
124 .await
125 .map_err(|e| HttpServerError::Bind {
126 address: self.config.bind_address.clone(),
127 source: e,
128 })?;
129
130 #[cfg(feature = "logger")]
131 tracing::info!(address = %addr, "HTTP server listening");
132
133 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
142 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
143 let ready_for_signal = Arc::clone(&self.ready);
144 let signal = async move {
145 shutdown.await;
146 ready_for_signal.store(false, Ordering::SeqCst);
147 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
148 let _ = tx.send(());
149 }
150 };
151 let serve = axum::serve(listener, app)
153 .with_graceful_shutdown(signal)
154 .into_future();
155 tokio::pin!(serve);
156
157 tokio::select! {
158 result = &mut serve => result.map_err(HttpServerError::Io)?,
159 () = async {
160 let _ = drain_started_rx.await;
161 tokio::time::sleep(shutdown_timeout).await;
162 } => {
163 #[cfg(feature = "logger")]
164 tracing::warn!(
165 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
166 "HTTP server graceful drain timed out -- forcing exit"
167 );
168 }
169 }
170
171 #[cfg(feature = "logger")]
172 tracing::info!("HTTP server shut down gracefully");
173
174 Ok(())
175 }
176
177 pub async fn serve_with_handle(self, app: Router) -> Result<(ShutdownHandle, ServerFuture)> {
185 let (tx, rx) = watch::channel(());
186 let handle = ShutdownHandle { sender: tx };
187
188 let shutdown_timeout = self.config.shutdown_timeout();
189 let app = self.build_router(app);
190
191 let addr: SocketAddr =
192 self.config
193 .bind_address
194 .parse()
195 .map_err(|e| HttpServerError::Bind {
196 address: self.config.bind_address.clone(),
197 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
198 })?;
199
200 let listener = TcpListener::bind(addr)
201 .await
202 .map_err(|e| HttpServerError::Bind {
203 address: self.config.bind_address.clone(),
204 source: e,
205 })?;
206
207 #[cfg(feature = "logger")]
208 tracing::info!(address = %addr, "HTTP server listening");
209
210 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
213 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
214 let ready_for_signal = Arc::clone(&self.ready);
215 let signal = async move {
216 let _ = rx.clone().changed().await;
217 ready_for_signal.store(false, Ordering::SeqCst);
218 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
219 let _ = tx.send(());
220 }
221 };
222
223 let future = ServerFuture {
224 inner: Box::pin(async move {
225 let serve = axum::serve(listener, app)
226 .with_graceful_shutdown(signal)
227 .into_future();
228 tokio::pin!(serve);
229
230 tokio::select! {
231 result = &mut serve => result.map_err(HttpServerError::Io)?,
232 () = async {
233 let _ = drain_started_rx.await;
234 tokio::time::sleep(shutdown_timeout).await;
235 } => {
236 #[cfg(feature = "logger")]
237 tracing::warn!(
238 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
239 "HTTP server graceful drain timed out -- forcing exit"
240 );
241 }
242 }
243
244 Ok(())
245 }),
246 };
247
248 Ok((handle, future))
249 }
250
251 fn build_router(&self, app: Router) -> Router {
259 let mut router = app;
260
261 if self.config.enable_health_endpoints {
262 let ready = Arc::clone(&self.ready);
263 let r1 = Arc::clone(&ready);
268 let r2 = Arc::clone(&ready);
269 router = router
270 .route("/health/live", get(health_live))
271 .route("/health/ready", get(move || health_ready(Arc::clone(&r1))))
272 .route("/healthz", get(health_live))
273 .route("/readyz", get(move || health_ready(Arc::clone(&r2))));
274 }
275
276 #[cfg(all(feature = "health", feature = "serde_json"))]
277 if self.config.enable_health_endpoints {
278 router = router.route("/health/detailed", get(health_detailed));
279 }
280
281 #[cfg(feature = "config")]
282 if self.config.enable_config_endpoint {
283 router = router.route("/config", get(config_dump));
284 }
285
286 router
287 .layer(TraceLayer::new_for_http())
288 .layer(TimeoutLayer::with_status_code(
289 StatusCode::REQUEST_TIMEOUT,
290 self.config.request_timeout(),
291 ))
292 .layer(ConcurrencyLimitLayer::new(self.config.max_connections))
295 }
296}
297
298#[derive(Clone)]
300pub struct ShutdownHandle {
301 sender: watch::Sender<()>,
302}
303
304impl ShutdownHandle {
305 pub fn shutdown(self) {
307 let _ = self.sender.send(());
308 }
309}
310
311pub struct ServerFuture {
313 inner: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
314}
315
316impl std::future::Future for ServerFuture {
317 type Output = Result<()>;
318
319 fn poll(
320 mut self: std::pin::Pin<&mut Self>,
321 cx: &mut std::task::Context<'_>,
322 ) -> std::task::Poll<Self::Output> {
323 self.inner.as_mut().poll(cx)
324 }
325}
326
327async fn health_live() -> impl IntoResponse {
329 (StatusCode::OK, "OK")
330}
331
332async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
338 let locally_ready = ready.load(Ordering::SeqCst);
339
340 #[cfg(feature = "health")]
341 let registry_ready = crate::health::HealthRegistry::is_ready();
342 #[cfg(not(feature = "health"))]
343 let registry_ready = true;
344
345 if locally_ready && registry_ready {
346 (StatusCode::OK, "OK")
347 } else {
348 (StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
349 }
350}
351
352#[cfg(all(feature = "health", feature = "serde_json"))]
357async fn health_detailed() -> impl IntoResponse {
358 let json = crate::health::HealthRegistry::to_json();
359 axum::Json(json)
360}
361
362#[cfg(feature = "config")]
364async fn config_dump() -> impl IntoResponse {
365 let effective = crate::config::registry::dump_effective();
366 let defaults = crate::config::registry::dump_defaults();
367
368 let body = serde_json::json!({
369 "effective": effective,
370 "defaults": defaults,
371 "sections": crate::config::registry::sections()
372 .iter()
373 .map(|s| serde_json::json!({
374 "key": s.key,
375 "type": s.type_name,
376 }))
377 .collect::<Vec<_>>(),
378 });
379
380 (
381 StatusCode::OK,
382 [("content-type", "application/json")],
383 serde_json::to_string_pretty(&body).unwrap_or_default(),
384 )
385}
386
387#[cfg(not(feature = "shutdown"))]
391async fn shutdown_signal() {
392 let ctrl_c = async {
393 signal::ctrl_c()
394 .await
395 .expect("failed to install Ctrl+C handler");
396 };
397
398 #[cfg(unix)]
399 let terminate = async {
400 signal::unix::signal(signal::unix::SignalKind::terminate())
401 .expect("failed to install signal handler")
402 .recv()
403 .await;
404 };
405
406 #[cfg(not(unix))]
407 let terminate = std::future::pending::<()>();
408
409 tokio::select! {
410 () = ctrl_c => {},
411 () = terminate => {},
412 }
413
414 #[cfg(feature = "logger")]
415 tracing::info!("Shutdown signal received, starting graceful shutdown");
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use axum::body::Body;
422 use axum::http::Request;
423 use tower::ServiceExt;
424
425 #[tokio::test]
426 async fn test_health_live() {
427 let config = HttpServerConfig::default();
428 let server = HttpServer::new(config);
429 let app = server.build_router(Router::new());
430
431 let response = app
432 .oneshot(
433 Request::builder()
434 .uri("/health/live")
435 .body(Body::empty())
436 .unwrap(),
437 )
438 .await
439 .unwrap();
440
441 assert_eq!(response.status(), StatusCode::OK);
442 }
443
444 #[tokio::test]
445 async fn test_health_ready_when_ready() {
446 #[cfg(feature = "health")]
447 crate::health::HealthRegistry::reset();
448
449 let config = HttpServerConfig::default();
450 let server = HttpServer::new(config);
451 server.set_ready(true);
452 let app = server.build_router(Router::new());
453
454 let response = app
455 .oneshot(
456 Request::builder()
457 .uri("/health/ready")
458 .body(Body::empty())
459 .unwrap(),
460 )
461 .await
462 .unwrap();
463
464 assert_eq!(response.status(), StatusCode::OK);
465 }
466
467 #[tokio::test]
468 async fn test_health_ready_when_not_ready() {
469 let config = HttpServerConfig::default();
470 let server = HttpServer::new(config);
471 server.set_ready(false);
472 let app = server.build_router(Router::new());
473
474 let response = app
475 .oneshot(
476 Request::builder()
477 .uri("/health/ready")
478 .body(Body::empty())
479 .unwrap(),
480 )
481 .await
482 .unwrap();
483
484 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
485 }
486
487 #[tokio::test]
488 async fn test_server_with_handle() {
489 let config = HttpServerConfig::new("127.0.0.1:18080");
491 let server = HttpServer::new(config);
492
493 let app = Router::new().route("/", get(|| async { "Hello" }));
494
495 let (handle, future) = server.serve_with_handle(app).await.unwrap();
497
498 handle.shutdown();
500
501 future.await.unwrap();
503 }
504
505 #[tokio::test]
509 async fn k8s_standard_health_paths_are_mounted() {
510 let config = HttpServerConfig::default();
511 let server = HttpServer::new(config);
512 let app = server.build_router(Router::new());
513
514 for path in &["/healthz", "/readyz"] {
515 let response = app
516 .clone()
517 .oneshot(Request::builder().uri(*path).body(Body::empty()).unwrap())
518 .await
519 .unwrap();
520 assert_eq!(response.status(), StatusCode::OK, "path={path}");
521 }
522 }
523
524 #[tokio::test]
528 async fn shutdown_signal_flips_ready_before_drain() {
529 let config = HttpServerConfig::new("127.0.0.1:18081");
530 let server = HttpServer::new(config);
531 let ready = server.ready_flag();
532 assert!(ready.load(Ordering::SeqCst), "ready starts true");
533
534 let app = Router::new().route(
535 "/slow",
536 get(|| async {
537 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
538 "done"
539 }),
540 );
541
542 let (handle, future) = server.serve_with_handle(app).await.unwrap();
543 let server_task = tokio::spawn(future);
544
545 handle.shutdown();
547
548 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
550 assert!(
551 !ready.load(Ordering::SeqCst),
552 "ready must flip to false post-shutdown",
553 );
554
555 let _ = server_task.await;
556 }
557
558 #[test]
559 fn test_ready_flag() {
560 let config = HttpServerConfig::default();
561 let server = HttpServer::new(config);
562
563 assert!(server.is_ready());
564 server.set_ready(false);
565 assert!(!server.is_ready());
566 server.set_ready(true);
567 assert!(server.is_ready());
568 }
569}