hyperi_rustlib/http_server/
server.rs1use crate::http_server::{HttpServerConfig, HttpServerError, Result};
12use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
13use std::future::IntoFuture;
14use std::net::SocketAddr;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use tokio::net::TcpListener;
18#[cfg(not(feature = "shutdown"))]
19use tokio::signal;
20use tokio::sync::watch;
21use tower::limit::ConcurrencyLimitLayer;
22use tower_http::timeout::TimeoutLayer;
23use tower_http::trace::TraceLayer;
24
25pub struct HttpServer {
29 config: HttpServerConfig,
30 ready: Arc<AtomicBool>,
31}
32
33impl HttpServer {
34 #[must_use]
36 pub fn new(config: HttpServerConfig) -> Self {
37 let ready = Arc::new(AtomicBool::new(true));
38
39 #[cfg(feature = "health")]
40 {
41 let r = Arc::clone(&ready);
42 crate::health::HealthRegistry::register("http_server", move || {
43 if r.load(Ordering::Relaxed) {
44 crate::health::HealthStatus::Healthy
45 } else {
46 crate::health::HealthStatus::Unhealthy
47 }
48 });
49 }
50
51 Self { config, ready }
52 }
53
54 #[must_use]
56 pub fn bind(address: impl Into<String>) -> Self {
57 Self::new(HttpServerConfig::new(address))
58 }
59
60 pub fn set_ready(&self, ready: bool) {
62 self.ready.store(ready, Ordering::SeqCst);
63 }
64
65 #[must_use]
67 pub fn is_ready(&self) -> bool {
68 self.ready.load(Ordering::SeqCst)
69 }
70
71 #[must_use]
73 pub fn ready_flag(&self) -> Arc<AtomicBool> {
74 Arc::clone(&self.ready)
75 }
76
77 pub async fn serve(self, app: Router) -> Result<()> {
89 #[cfg(feature = "shutdown")]
90 {
91 let token = crate::shutdown::install_signal_handler();
92 self.serve_with_shutdown(app, token.cancelled_owned()).await
93 }
94 #[cfg(not(feature = "shutdown"))]
95 {
96 self.serve_with_shutdown(app, shutdown_signal()).await
97 }
98 }
99
100 pub async fn serve_with_shutdown<F>(self, app: Router, shutdown: F) -> Result<()>
108 where
109 F: std::future::Future<Output = ()> + Send + 'static,
110 {
111 self.config.validate().map_err(HttpServerError::TlsConfig)?;
115 let shutdown_timeout = self.config.shutdown_timeout();
116 let app = self.build_router(app);
117
118 let addr: SocketAddr =
119 self.config
120 .bind_address
121 .parse()
122 .map_err(|e| HttpServerError::Bind {
123 address: self.config.bind_address.clone(),
124 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
125 })?;
126
127 let listener = TcpListener::bind(addr)
128 .await
129 .map_err(|e| HttpServerError::Bind {
130 address: self.config.bind_address.clone(),
131 source: e,
132 })?;
133
134 #[cfg(feature = "logger")]
135 tracing::info!(address = %addr, "HTTP server listening");
136
137 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
146 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
147 let ready_for_signal = Arc::clone(&self.ready);
148 let signal = async move {
149 shutdown.await;
150 ready_for_signal.store(false, Ordering::SeqCst);
151 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
152 let _ = tx.send(());
153 }
154 };
155 let serve = axum::serve(listener, app)
157 .with_graceful_shutdown(signal)
158 .into_future();
159 tokio::pin!(serve);
160
161 tokio::select! {
162 result = &mut serve => result.map_err(HttpServerError::Io)?,
163 () = async {
164 let _ = drain_started_rx.await;
165 tokio::time::sleep(shutdown_timeout).await;
166 } => {
167 #[cfg(feature = "logger")]
168 tracing::warn!(
169 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
170 "HTTP server graceful drain timed out -- forcing exit"
171 );
172 }
173 }
174
175 #[cfg(feature = "logger")]
176 tracing::info!("HTTP server shut down gracefully");
177
178 Ok(())
179 }
180
181 pub async fn serve_with_handle(self, app: Router) -> Result<(ShutdownHandle, ServerFuture)> {
189 self.config.validate().map_err(HttpServerError::TlsConfig)?;
191 let (tx, rx) = watch::channel(());
192 let handle = ShutdownHandle { sender: tx };
193
194 let shutdown_timeout = self.config.shutdown_timeout();
195 let app = self.build_router(app);
196
197 let addr: SocketAddr =
198 self.config
199 .bind_address
200 .parse()
201 .map_err(|e| HttpServerError::Bind {
202 address: self.config.bind_address.clone(),
203 source: std::io::Error::new(std::io::ErrorKind::InvalidInput, e),
204 })?;
205
206 let listener = TcpListener::bind(addr)
207 .await
208 .map_err(|e| HttpServerError::Bind {
209 address: self.config.bind_address.clone(),
210 source: e,
211 })?;
212
213 #[cfg(feature = "logger")]
214 tracing::info!(address = %addr, "HTTP server listening");
215
216 let (drain_started_tx, drain_started_rx) = tokio::sync::oneshot::channel();
219 let drain_started_tx = std::sync::Mutex::new(Some(drain_started_tx));
220 let ready_for_signal = Arc::clone(&self.ready);
221 let signal = async move {
222 let _ = rx.clone().changed().await;
223 ready_for_signal.store(false, Ordering::SeqCst);
224 if let Some(tx) = drain_started_tx.lock().ok().and_then(|mut g| g.take()) {
225 let _ = tx.send(());
226 }
227 };
228
229 let future = ServerFuture {
230 inner: Box::pin(async move {
231 let serve = axum::serve(listener, app)
232 .with_graceful_shutdown(signal)
233 .into_future();
234 tokio::pin!(serve);
235
236 tokio::select! {
237 result = &mut serve => result.map_err(HttpServerError::Io)?,
238 () = async {
239 let _ = drain_started_rx.await;
240 tokio::time::sleep(shutdown_timeout).await;
241 } => {
242 #[cfg(feature = "logger")]
243 tracing::warn!(
244 timeout_ms = u64::try_from(shutdown_timeout.as_millis()).unwrap_or(u64::MAX),
245 "HTTP server graceful drain timed out -- forcing exit"
246 );
247 }
248 }
249
250 Ok(())
251 }),
252 };
253
254 Ok((handle, future))
255 }
256
257 fn build_router(&self, app: Router) -> Router {
265 let mut router = app;
266
267 if self.config.enable_health_endpoints {
268 let ready = Arc::clone(&self.ready);
269 let r1 = Arc::clone(&ready);
274 let r2 = Arc::clone(&ready);
275 router = router
276 .route("/health/live", get(health_live))
277 .route("/health/ready", get(move || health_ready(Arc::clone(&r1))))
278 .route("/healthz", get(health_live))
279 .route("/readyz", get(move || health_ready(Arc::clone(&r2))));
280 }
281
282 #[cfg(all(feature = "health", feature = "serde_json"))]
283 if self.config.enable_health_endpoints {
284 router = router.route("/health/detailed", get(health_detailed));
285 }
286
287 #[cfg(feature = "config")]
288 if self.config.enable_config_endpoint {
289 router = router.route("/config", get(config_dump));
290 }
291
292 router
293 .layer(TraceLayer::new_for_http())
294 .layer(TimeoutLayer::with_status_code(
295 StatusCode::REQUEST_TIMEOUT,
296 self.config.request_timeout(),
297 ))
298 .layer(ConcurrencyLimitLayer::new(self.config.max_connections))
301 }
302}
303
304#[derive(Clone)]
306pub struct ShutdownHandle {
307 sender: watch::Sender<()>,
308}
309
310impl ShutdownHandle {
311 pub fn shutdown(self) {
313 let _ = self.sender.send(());
314 }
315}
316
317pub struct ServerFuture {
319 inner: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>,
320}
321
322impl std::future::Future for ServerFuture {
323 type Output = Result<()>;
324
325 fn poll(
326 mut self: std::pin::Pin<&mut Self>,
327 cx: &mut std::task::Context<'_>,
328 ) -> std::task::Poll<Self::Output> {
329 self.inner.as_mut().poll(cx)
330 }
331}
332
333async fn health_live() -> impl IntoResponse {
335 (StatusCode::OK, "OK")
336}
337
338async fn health_ready(ready: Arc<AtomicBool>) -> impl IntoResponse {
344 let locally_ready = ready.load(Ordering::SeqCst);
345
346 #[cfg(feature = "health")]
347 let registry_ready = crate::health::HealthRegistry::is_ready();
348 #[cfg(not(feature = "health"))]
349 let registry_ready = true;
350
351 if locally_ready && registry_ready {
352 (StatusCode::OK, "OK")
353 } else {
354 (StatusCode::SERVICE_UNAVAILABLE, "NOT READY")
355 }
356}
357
358#[cfg(all(feature = "health", feature = "serde_json"))]
363async fn health_detailed() -> impl IntoResponse {
364 let json = crate::health::HealthRegistry::to_json();
365 axum::Json(json)
366}
367
368#[cfg(feature = "config")]
370async fn config_dump() -> impl IntoResponse {
371 let effective = crate::config::registry::dump_effective();
372 let defaults = crate::config::registry::dump_defaults();
373
374 let body = serde_json::json!({
375 "effective": effective,
376 "defaults": defaults,
377 "sections": crate::config::registry::sections()
378 .iter()
379 .map(|s| serde_json::json!({
380 "key": s.key,
381 "type": s.type_name,
382 }))
383 .collect::<Vec<_>>(),
384 });
385
386 (
387 StatusCode::OK,
388 [("content-type", "application/json")],
389 serde_json::to_string_pretty(&body).unwrap_or_default(),
390 )
391}
392
393#[cfg(not(feature = "shutdown"))]
397async fn shutdown_signal() {
398 let ctrl_c = async {
399 signal::ctrl_c()
400 .await
401 .expect("failed to install Ctrl+C handler");
402 };
403
404 #[cfg(unix)]
405 let terminate = async {
406 signal::unix::signal(signal::unix::SignalKind::terminate())
407 .expect("failed to install signal handler")
408 .recv()
409 .await;
410 };
411
412 #[cfg(not(unix))]
413 let terminate = std::future::pending::<()>();
414
415 tokio::select! {
416 () = ctrl_c => {},
417 () = terminate => {},
418 }
419
420 #[cfg(feature = "logger")]
421 tracing::info!("Shutdown signal received, starting graceful shutdown");
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use axum::body::Body;
428 use axum::http::Request;
429 use tower::ServiceExt;
430
431 #[tokio::test]
432 async fn test_health_live() {
433 let config = HttpServerConfig::default();
434 let server = HttpServer::new(config);
435 let app = server.build_router(Router::new());
436
437 let response = app
438 .oneshot(
439 Request::builder()
440 .uri("/health/live")
441 .body(Body::empty())
442 .unwrap(),
443 )
444 .await
445 .unwrap();
446
447 assert_eq!(response.status(), StatusCode::OK);
448 }
449
450 #[tokio::test]
451 async fn test_health_ready_when_ready() {
452 #[cfg(feature = "health")]
453 crate::health::HealthRegistry::reset();
454
455 let config = HttpServerConfig::default();
456 let server = HttpServer::new(config);
457 server.set_ready(true);
458 let app = server.build_router(Router::new());
459
460 let response = app
461 .oneshot(
462 Request::builder()
463 .uri("/health/ready")
464 .body(Body::empty())
465 .unwrap(),
466 )
467 .await
468 .unwrap();
469
470 assert_eq!(response.status(), StatusCode::OK);
471 }
472
473 #[tokio::test]
474 async fn test_health_ready_when_not_ready() {
475 let config = HttpServerConfig::default();
476 let server = HttpServer::new(config);
477 server.set_ready(false);
478 let app = server.build_router(Router::new());
479
480 let response = app
481 .oneshot(
482 Request::builder()
483 .uri("/health/ready")
484 .body(Body::empty())
485 .unwrap(),
486 )
487 .await
488 .unwrap();
489
490 assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
491 }
492
493 #[tokio::test]
494 async fn test_server_with_handle() {
495 let config = HttpServerConfig::new("127.0.0.1:18080");
497 let server = HttpServer::new(config);
498
499 let app = Router::new().route("/", get(|| async { "Hello" }));
500
501 let (handle, future) = server.serve_with_handle(app).await.unwrap();
503
504 handle.shutdown();
506
507 future.await.unwrap();
509 }
510
511 #[tokio::test]
515 async fn k8s_standard_health_paths_are_mounted() {
516 let config = HttpServerConfig::default();
517 let server = HttpServer::new(config);
518 let app = server.build_router(Router::new());
519
520 for path in &["/healthz", "/readyz"] {
521 let response = app
522 .clone()
523 .oneshot(Request::builder().uri(*path).body(Body::empty()).unwrap())
524 .await
525 .unwrap();
526 assert_eq!(response.status(), StatusCode::OK, "path={path}");
527 }
528 }
529
530 #[tokio::test]
534 async fn shutdown_signal_flips_ready_before_drain() {
535 let config = HttpServerConfig::new("127.0.0.1:18081");
536 let server = HttpServer::new(config);
537 let ready = server.ready_flag();
538 assert!(ready.load(Ordering::SeqCst), "ready starts true");
539
540 let app = Router::new().route(
541 "/slow",
542 get(|| async {
543 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
544 "done"
545 }),
546 );
547
548 let (handle, future) = server.serve_with_handle(app).await.unwrap();
549 let server_task = tokio::spawn(future);
550
551 handle.shutdown();
553
554 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
556 assert!(
557 !ready.load(Ordering::SeqCst),
558 "ready must flip to false post-shutdown",
559 );
560
561 let _ = server_task.await;
562 }
563
564 #[test]
565 fn test_ready_flag() {
566 let config = HttpServerConfig::default();
567 let server = HttpServer::new(config);
568
569 assert!(server.is_ready());
570 server.set_ready(false);
571 assert!(!server.is_ready());
572 server.set_ready(true);
573 assert!(server.is_ready());
574 }
575}