1use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsHierarchy;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21 pub socket_addr: std::net::SocketAddr,
22 pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27 Self {
28 socket_addr,
29 handle: handle.map(Arc::new),
30 }
31 }
32
33 pub fn address(&self) -> String {
34 self.socket_addr.to_string()
35 }
36
37 pub fn hostname(&self) -> String {
38 self.socket_addr.ip().to_string()
39 }
40
41 pub fn port(&self) -> u16 {
42 self.socket_addr.port()
43 }
44}
45
46impl Clone for SystemStatusServerInfo {
47 fn clone(&self) -> Self {
48 Self {
49 socket_addr: self.socket_addr,
50 handle: self.handle.clone(),
51 }
52 }
53}
54
55pub struct SystemStatusState {
57 root_drt: Arc<crate::DistributedRuntime>,
59 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
61}
62
63impl SystemStatusState {
64 pub fn new(
66 drt: Arc<crate::DistributedRuntime>,
67 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
68 ) -> anyhow::Result<Self> {
69 Ok(Self {
70 root_drt: drt,
71 discovery_metadata,
72 })
73 }
74
75 pub fn drt(&self) -> &crate::DistributedRuntime {
77 &self.root_drt
78 }
79
80 pub fn discovery_metadata(
82 &self,
83 ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
84 self.discovery_metadata.as_ref()
85 }
86}
87
88pub async fn spawn_system_status_server(
90 host: &str,
91 port: u16,
92 cancel_token: CancellationToken,
93 drt: Arc<crate::DistributedRuntime>,
94 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
95) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
96 let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
98 let health_path = server_state
99 .drt()
100 .system_health()
101 .lock()
102 .health_path()
103 .to_string();
104 let live_path = server_state
105 .drt()
106 .system_health()
107 .lock()
108 .live_path()
109 .to_string();
110
111 let app = Router::new()
112 .route(
113 &health_path,
114 get({
115 let state = Arc::clone(&server_state);
116 move || health_handler(state)
117 }),
118 )
119 .route(
120 &live_path,
121 get({
122 let state = Arc::clone(&server_state);
123 move || health_handler(state)
124 }),
125 )
126 .route(
127 "/metrics",
128 get({
129 let state = Arc::clone(&server_state);
130 move || metrics_handler(state)
131 }),
132 )
133 .route(
134 "/metadata",
135 get({
136 let state = Arc::clone(&server_state);
137 move || metadata_handler(state)
138 }),
139 )
140 .fallback(|| async {
141 tracing::info!("[fallback handler] called");
142 (StatusCode::NOT_FOUND, "Route not found").into_response()
143 })
144 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
145
146 let address = format!("{}:{}", host, port);
147 tracing::info!("[spawn_system_status_server] binding to: {}", address);
148
149 let listener = match TcpListener::bind(&address).await {
150 Ok(listener) => {
151 let actual_address = listener.local_addr()?;
153 tracing::info!(
154 "[spawn_system_status_server] system status server bound to: {}",
155 actual_address
156 );
157 (listener, actual_address)
158 }
159 Err(e) => {
160 tracing::error!("Failed to bind to address {}: {}", address, e);
161 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
162 }
163 };
164 let (listener, actual_address) = listener;
165
166 let observer = cancel_token.child_token();
167 let handle = tokio::spawn(async move {
169 if let Err(e) = axum::serve(listener, app)
170 .with_graceful_shutdown(observer.cancelled_owned())
171 .await
172 {
173 tracing::error!("System status server error: {}", e);
174 }
175 });
176
177 Ok((actual_address, handle))
178}
179
180#[tracing::instrument(skip_all, level = "trace")]
182async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
183 let system_health = state.drt().system_health();
185 let system_health_lock = system_health.lock();
186 let (healthy, endpoints) = system_health_lock.get_health_status();
187 let uptime = Some(system_health_lock.uptime());
188 drop(system_health_lock);
189
190 let healthy_string = if healthy { "ready" } else { "notready" };
191 let status_code = if healthy {
192 StatusCode::OK
193 } else {
194 StatusCode::SERVICE_UNAVAILABLE
195 };
196
197 let response = json!({
198 "status": healthy_string,
199 "uptime": uptime,
200 "endpoints": endpoints,
201 });
202
203 tracing::trace!("Response {}", response.to_string());
204
205 (status_code, response.to_string())
206}
207
208#[tracing::instrument(skip_all, level = "trace")]
210async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
211 state.drt().system_health().lock().update_uptime_gauge();
213
214 let response = match state.drt().metrics().prometheus_expfmt() {
220 Ok(r) => r,
221 Err(e) => {
222 tracing::error!("Failed to get metrics from registry: {}", e);
223 return (
224 StatusCode::INTERNAL_SERVER_ERROR,
225 "Failed to get metrics".to_string(),
226 );
227 }
228 };
229
230 (StatusCode::OK, response)
231}
232
233#[tracing::instrument(skip_all, level = "trace")]
235async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
236 let metadata = match state.discovery_metadata() {
238 Some(metadata) => metadata,
239 None => {
240 tracing::debug!("Metadata endpoint called but no discovery metadata available");
241 return (
242 StatusCode::NOT_FOUND,
243 "Discovery metadata not available".to_string(),
244 )
245 .into_response();
246 }
247 };
248
249 let metadata_guard = metadata.read().await;
251
252 match serde_json::to_string(&*metadata_guard) {
254 Ok(json) => {
255 tracing::trace!("Returning metadata: {} bytes", json.len());
256 (StatusCode::OK, json).into_response()
257 }
258 Err(e) => {
259 tracing::error!("Failed to serialize metadata: {}", e);
260 (
261 StatusCode::INTERNAL_SERVER_ERROR,
262 "Failed to serialize metadata".to_string(),
263 )
264 .into_response()
265 }
266 }
267}
268
269#[cfg(test)]
271mod tests {
272 use super::*;
273 use tokio::time::Duration;
274
275 #[tokio::test]
277 async fn test_http_server_lifecycle() {
278 let cancel_token = CancellationToken::new();
279 let cancel_token_for_server = cancel_token.clone();
280
281 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
283
284 let server_handle = tokio::spawn(async move {
286 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
287 let _ = axum::serve(listener, app)
288 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
289 .await;
290 });
291
292 cancel_token.cancel();
296
297 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
299 assert!(
300 result.is_ok(),
301 "HTTP server should shut down when cancel token is cancelled"
302 );
303 }
304}
305
306#[cfg(all(test, feature = "integration"))]
308mod integration_tests {
309 use super::*;
310 use crate::distributed::distributed_test_utils::create_test_drt_async;
311 use crate::metrics::MetricsHierarchy;
312 use anyhow::Result;
313 use rstest::rstest;
314 use std::sync::Arc;
315 use tokio::time::Duration;
316
317 #[tokio::test]
318 async fn test_uptime_from_system_health() {
319 temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
321 let drt = create_test_drt_async().await;
322
323 let uptime = drt.system_health().lock().uptime();
325 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
327
328 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
330 let uptime_after = drt.system_health().lock().uptime();
331 assert!(uptime_after > uptime);
332 })
333 .await;
334 }
335
336 #[tokio::test]
337 async fn test_runtime_metrics_initialization_and_namespace() {
338 temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
340 let drt = create_test_drt_async().await;
341 let response = drt.metrics().prometheus_expfmt().unwrap();
346 println!("Full metrics response:\n{}", response);
347
348 let filtered_response: String = response
350 .lines()
351 .filter(|line| {
352 !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
353 })
354 .collect::<Vec<_>>()
355 .join("\n");
356
357 assert!(
359 filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
360 "Should contain uptime_seconds help text"
361 );
362 assert!(
363 filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
364 "Should contain uptime_seconds type"
365 );
366 assert!(
367 filtered_response.contains("dynamo_component_uptime_seconds"),
368 "Should contain uptime_seconds metric with correct namespace"
369 );
370 })
371 .await;
372 }
373
374 #[tokio::test]
375 async fn test_uptime_gauge_updates() {
376 temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
378 let drt = create_test_drt_async().await;
379
380 let initial_uptime = drt.system_health().lock().uptime();
382
383 drt.system_health().lock().update_uptime_gauge();
385
386 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
388
389 let uptime_after_sleep = drt.system_health().lock().uptime();
391
392 drt.system_health().lock().update_uptime_gauge();
394
395 let elapsed = uptime_after_sleep - initial_uptime;
397 assert!(
398 elapsed >= std::time::Duration::from_millis(100),
399 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
400 elapsed
401 );
402 })
403 .await;
404 }
405
406 #[tokio::test]
407 async fn test_http_requests_fail_when_system_disabled() {
408 temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
410 let drt = create_test_drt_async().await;
411
412 let system_info = drt.system_status_server_info();
414 assert!(
415 system_info.is_none(),
416 "System status server should not be running when disabled"
417 );
418
419 println!("✓ System status server correctly disabled when not enabled");
420 })
421 .await;
422 }
423
424 #[rstest]
431 #[case("ready", 200, "ready", None, None, 3)]
432 #[case("notready", 503, "notready", None, None, 3)]
433 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
434 #[case(
435 "notready",
436 503,
437 "notready",
438 Some("/custom/health"),
439 Some("/custom/live"),
440 5
441 )]
442 #[tokio::test]
443 #[cfg(feature = "integration")]
444 async fn test_health_endpoints(
445 #[case] starting_health_status: &'static str,
446 #[case] expected_status: u16,
447 #[case] expected_body: &'static str,
448 #[case] custom_health_path: Option<&'static str>,
449 #[case] custom_live_path: Option<&'static str>,
450 #[case] expected_num_tests: usize,
451 ) {
452 use std::sync::Arc;
453 crate::logging::init();
459
460 #[allow(clippy::redundant_closure_call)]
461 temp_env::async_with_vars(
462 [
463 ("DYN_SYSTEM_PORT", Some("0")),
464 (
465 "DYN_SYSTEM_STARTING_HEALTH_STATUS",
466 Some(starting_health_status),
467 ),
468 ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
469 ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
470 ],
471 (async || {
472 let drt = Arc::new(create_test_drt_async().await);
473
474 let system_info = drt
476 .system_status_server_info()
477 .expect("System status server should be started by DRT");
478 let addr = system_info.socket_addr;
479
480 let client = reqwest::Client::new();
481
482 let mut test_cases = vec![];
484 match custom_health_path {
485 None => {
486 test_cases.push(("/health", expected_status, expected_body));
488 }
489 Some(chp) => {
490 test_cases.push(("/health", 404, "Route not found"));
492 test_cases.push((chp, expected_status, expected_body));
493 }
494 }
495 match custom_live_path {
496 None => {
497 test_cases.push(("/live", expected_status, expected_body));
499 }
500 Some(clp) => {
501 test_cases.push(("/live", 404, "Route not found"));
503 test_cases.push((clp, expected_status, expected_body));
504 }
505 }
506 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
507 assert_eq!(test_cases.len(), expected_num_tests);
508
509 for (path, expect_status, expect_body) in test_cases {
510 println!("[test] Sending request to {}", path);
511 let url = format!("http://{}{}", addr, path);
512 let response = client.get(&url).send().await.unwrap();
513 let status = response.status();
514 let body = response.text().await.unwrap();
515 println!(
516 "[test] Response for {}: status={}, body={:?}",
517 path, status, body
518 );
519 assert_eq!(
520 status, expect_status,
521 "Response: status={}, body={:?}",
522 status, body
523 );
524 assert!(
525 body.contains(expect_body),
526 "Response: status={}, body={:?}",
527 status,
528 body
529 );
530 }
531 })(),
532 )
533 .await;
534 }
535
536 #[tokio::test]
537 async fn test_health_endpoint_tracing() -> Result<()> {
538 use std::sync::Arc;
539
540 #[allow(clippy::redundant_closure_call)]
543 let _ = temp_env::async_with_vars(
544 [
545 ("DYN_SYSTEM_PORT", Some("0")),
546 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
547 ("DYN_LOGGING_JSONL", Some("1")),
548 ("DYN_LOG", Some("trace")),
549 ],
550 (async || {
551 crate::logging::init();
555
556 let drt = Arc::new(create_test_drt_async().await);
557
558 let system_info = drt
560 .system_status_server_info()
561 .expect("System status server should be started by DRT");
562 let addr = system_info.socket_addr;
563 let client = reqwest::Client::new();
564 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
565 let traceparent_value =
566 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
567 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
568 let mut headers = reqwest::header::HeaderMap::new();
569 headers.insert(
570 reqwest::header::HeaderName::from_static("traceparent"),
571 reqwest::header::HeaderValue::from_str(traceparent_value)?,
572 );
573 headers.insert(
574 reqwest::header::HeaderName::from_static("tracestate"),
575 reqwest::header::HeaderValue::from_str(tracestate_value)?,
576 );
577 let url = format!("http://{}{}", addr, path);
578 let response = client.get(&url).headers(headers).send().await.unwrap();
579 let status = response.status();
580 let body = response.text().await.unwrap();
581 tracing::info!(body = body, status = status.to_string());
582 }
583
584 Ok::<(), anyhow::Error>(())
585 })(),
586 )
587 .await;
588 Ok(())
589 }
590
591 #[tokio::test]
592 async fn test_health_endpoint_with_changing_health_status() {
593 const ENDPOINT_NAME: &str = "generate";
596 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
597 temp_env::async_with_vars(
598 [
599 ("DYN_SYSTEM_PORT", Some("0")),
600 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
601 ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
602 ],
603 async {
604 let drt = Arc::new(create_test_drt_async().await);
605
606 let system_info_opt = drt.system_status_server_info();
608
609 assert!(
611 system_info_opt.is_some(),
612 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
613 std::env::var("DYN_SYSTEM_PORT")
614 );
615
616 let system_info = system_info_opt.unwrap();
618 let addr = system_info.socket_addr;
619
620 let client = reqwest::Client::new();
622 let health_url = format!("http://{}/health", addr);
623
624 let response = client.get(&health_url).send().await.unwrap();
625 let status = response.status();
626 let body = response.text().await.unwrap();
627
628 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
630 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
631
632 let namespace = drt.namespace("ns1234").unwrap();
634 let mut component = namespace.component("comp1234").unwrap();
635
636 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
638 use crate::protocols::annotated::Annotated;
639
640 struct TestHandler;
641
642 #[async_trait]
643 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
644 async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
645 let (data, ctx) = input.into_parts();
646 let response = Annotated::from_data(format!("You responded: {}", data));
647 Ok(crate::pipeline::ResponseStream::new(
648 Box::pin(crate::stream::iter(vec![response])),
649 ctx.context()
650 ))
651 }
652 }
653
654 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
656
657 tokio::spawn(async move {
660 component.add_stats_service().await.unwrap();
661 let _ = component.endpoint(ENDPOINT_NAME)
662 .endpoint_builder()
663 .handler(ingress)
664 .health_check_payload(serde_json::json!({
665 "test": "health_check"
666 }))
667 .start()
668 .await;
669 });
670
671 let mut success_count = 0;
673 let mut failures = Vec::new();
674
675 for i in 1..=200 {
676 let response = client.get(&health_url).send().await.unwrap();
677 let status = response.status();
678 let body = response.text().await.unwrap();
679
680 if status == 200 && body.contains("\"status\":\"ready\"") {
681 success_count += 1;
682 } else {
683 failures.push((i, status.as_u16(), body.clone()));
684 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
686 }
687 }
688 }
689
690 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
691 if !failures.is_empty() {
692 tracing::warn!("Failed requests: {}", failures.len());
693 }
694
695 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
697 },
698 )
699 .await;
700 }
701
702 #[tokio::test]
703 async fn test_spawn_system_status_server_endpoints() {
704 temp_env::async_with_vars(
706 [
707 ("DYN_SYSTEM_PORT", Some("0")),
708 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
709 ],
710 async {
711 let drt = Arc::new(create_test_drt_async().await);
712
713 let system_info = drt
715 .system_status_server_info()
716 .expect("System status server should be started by DRT");
717 let addr = system_info.socket_addr;
718 let client = reqwest::Client::new();
719 for (path, expect_200, expect_body) in [
720 ("/health", true, "ready"),
721 ("/live", true, "ready"),
722 ("/someRandomPathNotFoundHere", false, "Route not found"),
723 ] {
724 println!("[test] Sending request to {}", path);
725 let url = format!("http://{}{}", addr, path);
726 let response = client.get(&url).send().await.unwrap();
727 let status = response.status();
728 let body = response.text().await.unwrap();
729 println!(
730 "[test] Response for {}: status={}, body={:?}",
731 path, status, body
732 );
733 if expect_200 {
734 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
735 } else {
736 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
737 }
738 assert!(
739 body.contains(expect_body),
740 "Response: status={}, body={:?}",
741 status,
742 body
743 );
744 }
745 },
747 )
748 .await;
749 }
750
751 #[cfg(feature = "integration")]
752 #[tokio::test]
753 async fn test_health_check_with_payload_and_timeout() {
754 crate::logging::init();
756
757 temp_env::async_with_vars(
758 [
759 ("DYN_SYSTEM_PORT", Some("0")),
760 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
761 (
762 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
763 Some("[\"test.endpoint\"]"),
764 ),
765 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
767 ("DYN_CANARY_WAIT_TIME", Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
771 async {
772 let drt = Arc::new(create_test_drt_async().await);
773
774 let system_info = drt
776 .system_status_server_info()
777 .expect("System status server should be started");
778 let addr = system_info.socket_addr;
779
780 let client = reqwest::Client::new();
781 let health_url = format!("http://{}/health", addr);
782
783 let endpoint = "test.endpoint";
785 let health_check_payload = serde_json::json!({
786 "prompt": "health check test",
787 "_health_check": true
788 });
789
790 {
792 let system_health = drt.system_health();
793 let system_health_lock = system_health.lock();
794 system_health_lock.register_health_check_target(
795 endpoint,
796 crate::component::Instance {
797 component: "test_component".to_string(),
798 endpoint: "health".to_string(),
799 namespace: "test_namespace".to_string(),
800 instance_id: 1,
801 transport: crate::component::TransportType::Nats(endpoint.to_string()),
802 },
803 health_check_payload.clone(),
804 );
805 }
806
807 let response = client.get(&health_url).send().await.unwrap();
809 let status = response.status();
810 let body = response.text().await.unwrap();
811 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
812 assert!(
813 body.contains("\"status\":\"notready\""),
814 "Should show notready status initially"
815 );
816
817 drt.system_health()
819 .lock()
820 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
821
822 let response = client.get(&health_url).send().await.unwrap();
824 let status = response.status();
825 let body = response.text().await.unwrap();
826
827 assert_eq!(status, 200, "Should be healthy due to recent response");
828 assert!(
829 body.contains("\"status\":\"ready\""),
830 "Should show ready status after response"
831 );
832
833 let endpoint_status = drt
835 .system_health()
836 .lock()
837 .get_endpoint_health_status(endpoint);
838 assert_eq!(
839 endpoint_status,
840 Some(HealthStatus::Ready),
841 "SystemHealth should show endpoint as Ready after response"
842 );
843 },
844 )
845 .await;
846 }
847}