1use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsHierarchy;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21 pub socket_addr: std::net::SocketAddr,
22 pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27 Self {
28 socket_addr,
29 handle: handle.map(Arc::new),
30 }
31 }
32
33 pub fn address(&self) -> String {
34 self.socket_addr.to_string()
35 }
36
37 pub fn hostname(&self) -> String {
38 self.socket_addr.ip().to_string()
39 }
40
41 pub fn port(&self) -> u16 {
42 self.socket_addr.port()
43 }
44}
45
46impl Clone for SystemStatusServerInfo {
47 fn clone(&self) -> Self {
48 Self {
49 socket_addr: self.socket_addr,
50 handle: self.handle.clone(),
51 }
52 }
53}
54
55pub struct SystemStatusState {
57 root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62 pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64 Ok(Self { root_drt: drt })
65 }
66
67 pub fn drt(&self) -> &crate::DistributedRuntime {
69 &self.root_drt
70 }
71}
72
73pub async fn spawn_system_status_server(
75 host: &str,
76 port: u16,
77 cancel_token: CancellationToken,
78 drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80 let server_state = Arc::new(SystemStatusState::new(drt)?);
82 let health_path = server_state
83 .drt()
84 .system_health
85 .lock()
86 .health_path()
87 .to_string();
88 let live_path = server_state
89 .drt()
90 .system_health
91 .lock()
92 .live_path()
93 .to_string();
94
95 let app = Router::new()
96 .route(
97 &health_path,
98 get({
99 let state = Arc::clone(&server_state);
100 move || health_handler(state)
101 }),
102 )
103 .route(
104 &live_path,
105 get({
106 let state = Arc::clone(&server_state);
107 move || health_handler(state)
108 }),
109 )
110 .route(
111 "/metrics",
112 get({
113 let state = Arc::clone(&server_state);
114 move || metrics_handler(state)
115 }),
116 )
117 .fallback(|| async {
118 tracing::info!("[fallback handler] called");
119 (StatusCode::NOT_FOUND, "Route not found").into_response()
120 })
121 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
122
123 let address = format!("{}:{}", host, port);
124 tracing::info!("[spawn_system_status_server] binding to: {}", address);
125
126 let listener = match TcpListener::bind(&address).await {
127 Ok(listener) => {
128 let actual_address = listener.local_addr()?;
130 tracing::info!(
131 "[spawn_system_status_server] system status server bound to: {}",
132 actual_address
133 );
134 (listener, actual_address)
135 }
136 Err(e) => {
137 tracing::error!("Failed to bind to address {}: {}", address, e);
138 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
139 }
140 };
141 let (listener, actual_address) = listener;
142
143 let observer = cancel_token.child_token();
144 let handle = tokio::spawn(async move {
146 if let Err(e) = axum::serve(listener, app)
147 .with_graceful_shutdown(observer.cancelled_owned())
148 .await
149 {
150 tracing::error!("System status server error: {}", e);
151 }
152 });
153
154 Ok((actual_address, handle))
155}
156
157#[tracing::instrument(skip_all, level = "trace")]
159async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
160 let system_health = state.drt().system_health.lock();
162 let (healthy, endpoints) = system_health.get_health_status();
163 let uptime = Some(system_health.uptime());
164
165 let healthy_string = if healthy { "ready" } else { "notready" };
166 let status_code = if healthy {
167 StatusCode::OK
168 } else {
169 StatusCode::SERVICE_UNAVAILABLE
170 };
171
172 let response = json!({
173 "status": healthy_string,
174 "uptime": uptime,
175 "endpoints": endpoints,
176 });
177
178 tracing::trace!("Response {}", response.to_string());
179
180 (status_code, response.to_string())
181}
182
183#[tracing::instrument(skip_all, level = "trace")]
185async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
186 state.drt().system_health.lock().update_uptime_gauge();
188
189 let response = match state.drt().metrics().prometheus_expfmt() {
195 Ok(r) => r,
196 Err(e) => {
197 tracing::error!("Failed to get metrics from registry: {}", e);
198 return (
199 StatusCode::INTERNAL_SERVER_ERROR,
200 "Failed to get metrics".to_string(),
201 );
202 }
203 };
204
205 (StatusCode::OK, response)
206}
207
208#[cfg(test)]
210mod tests {
211 use super::*;
212 use tokio::time::Duration;
213
214 #[tokio::test]
216 async fn test_http_server_lifecycle() {
217 let cancel_token = CancellationToken::new();
218 let cancel_token_for_server = cancel_token.clone();
219
220 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
222
223 let server_handle = tokio::spawn(async move {
225 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
226 let _ = axum::serve(listener, app)
227 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
228 .await;
229 });
230
231 cancel_token.cancel();
235
236 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
238 assert!(
239 result.is_ok(),
240 "HTTP server should shut down when cancel token is cancelled"
241 );
242 }
243}
244
245#[cfg(all(test, feature = "integration"))]
247mod integration_tests {
248 use super::*;
249 use crate::distributed::distributed_test_utils::create_test_drt_async;
250 use crate::metrics::MetricsHierarchy;
251 use anyhow::Result;
252 use rstest::rstest;
253 use std::sync::Arc;
254 use tokio::time::Duration;
255
256 #[tokio::test]
257 async fn test_uptime_from_system_health() {
258 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
260 let drt = create_test_drt_async().await;
261
262 let uptime = drt.system_health.lock().uptime();
264 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
266
267 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
269 let uptime_after = drt.system_health.lock().uptime();
270 assert!(uptime_after > uptime);
271 })
272 .await;
273 }
274
275 #[tokio::test]
276 async fn test_runtime_metrics_initialization_and_namespace() {
277 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
279 let drt = create_test_drt_async().await;
280 let response = drt.metrics().prometheus_expfmt().unwrap();
285 println!("Full metrics response:\n{}", response);
286
287 let filtered_response: String = response
289 .lines()
290 .filter(|line| {
291 !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
292 })
293 .collect::<Vec<_>>()
294 .join("\n");
295
296 assert!(
298 filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
299 "Should contain uptime_seconds help text"
300 );
301 assert!(
302 filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
303 "Should contain uptime_seconds type"
304 );
305 assert!(
306 filtered_response.contains("dynamo_component_uptime_seconds"),
307 "Should contain uptime_seconds metric with correct namespace"
308 );
309 })
310 .await;
311 }
312
313 #[tokio::test]
314 async fn test_uptime_gauge_updates() {
315 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
317 let drt = create_test_drt_async().await;
318
319 let initial_uptime = drt.system_health.lock().uptime();
321
322 drt.system_health.lock().update_uptime_gauge();
324
325 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
327
328 let uptime_after_sleep = drt.system_health.lock().uptime();
330
331 drt.system_health.lock().update_uptime_gauge();
333
334 let elapsed = uptime_after_sleep - initial_uptime;
336 assert!(
337 elapsed >= std::time::Duration::from_millis(100),
338 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
339 elapsed
340 );
341 })
342 .await;
343 }
344
345 #[tokio::test]
346 async fn test_http_requests_fail_when_system_disabled() {
347 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
349 let drt = create_test_drt_async().await;
350
351 let system_info = drt.system_status_server_info();
353 assert!(
354 system_info.is_none(),
355 "System status server should not be running when DYN_SYSTEM_ENABLED=false"
356 );
357
358 println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
359 })
360 .await;
361 }
362
363 #[rstest]
370 #[case("ready", 200, "ready", None, None, 3)]
371 #[case("notready", 503, "notready", None, None, 3)]
372 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
373 #[case(
374 "notready",
375 503,
376 "notready",
377 Some("/custom/health"),
378 Some("/custom/live"),
379 5
380 )]
381 #[tokio::test]
382 #[cfg(feature = "integration")]
383 async fn test_health_endpoints(
384 #[case] starting_health_status: &'static str,
385 #[case] expected_status: u16,
386 #[case] expected_body: &'static str,
387 #[case] custom_health_path: Option<&'static str>,
388 #[case] custom_live_path: Option<&'static str>,
389 #[case] expected_num_tests: usize,
390 ) {
391 use std::sync::Arc;
392 crate::logging::init();
398
399 #[allow(clippy::redundant_closure_call)]
400 temp_env::async_with_vars(
401 [
402 ("DYN_SYSTEM_ENABLED", Some("true")),
403 ("DYN_SYSTEM_PORT", Some("0")),
404 (
405 "DYN_SYSTEM_STARTING_HEALTH_STATUS",
406 Some(starting_health_status),
407 ),
408 ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
409 ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
410 ],
411 (async || {
412 let drt = Arc::new(create_test_drt_async().await);
413
414 let system_info = drt
416 .system_status_server_info()
417 .expect("System status server should be started by DRT");
418 let addr = system_info.socket_addr;
419
420 let client = reqwest::Client::new();
421
422 let mut test_cases = vec![];
424 match custom_health_path {
425 None => {
426 test_cases.push(("/health", expected_status, expected_body));
428 }
429 Some(chp) => {
430 test_cases.push(("/health", 404, "Route not found"));
432 test_cases.push((chp, expected_status, expected_body));
433 }
434 }
435 match custom_live_path {
436 None => {
437 test_cases.push(("/live", expected_status, expected_body));
439 }
440 Some(clp) => {
441 test_cases.push(("/live", 404, "Route not found"));
443 test_cases.push((clp, expected_status, expected_body));
444 }
445 }
446 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
447 assert_eq!(test_cases.len(), expected_num_tests);
448
449 for (path, expect_status, expect_body) in test_cases {
450 println!("[test] Sending request to {}", path);
451 let url = format!("http://{}{}", addr, path);
452 let response = client.get(&url).send().await.unwrap();
453 let status = response.status();
454 let body = response.text().await.unwrap();
455 println!(
456 "[test] Response for {}: status={}, body={:?}",
457 path, status, body
458 );
459 assert_eq!(
460 status, expect_status,
461 "Response: status={}, body={:?}",
462 status, body
463 );
464 assert!(
465 body.contains(expect_body),
466 "Response: status={}, body={:?}",
467 status,
468 body
469 );
470 }
471 })(),
472 )
473 .await;
474 }
475
476 #[tokio::test]
477 async fn test_health_endpoint_tracing() -> Result<()> {
478 use std::sync::Arc;
479
480 #[allow(clippy::redundant_closure_call)]
483 let _ = temp_env::async_with_vars(
484 [
485 ("DYN_SYSTEM_ENABLED", Some("true")),
486 ("DYN_SYSTEM_PORT", Some("0")),
487 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
488 ("DYN_LOGGING_JSONL", Some("1")),
489 ("DYN_LOG", Some("trace")),
490 ],
491 (async || {
492 crate::logging::init();
496
497 let drt = Arc::new(create_test_drt_async().await);
498
499 let system_info = drt
501 .system_status_server_info()
502 .expect("System status server should be started by DRT");
503 let addr = system_info.socket_addr;
504 let client = reqwest::Client::new();
505 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
506 let traceparent_value =
507 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
508 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
509 let mut headers = reqwest::header::HeaderMap::new();
510 headers.insert(
511 reqwest::header::HeaderName::from_static("traceparent"),
512 reqwest::header::HeaderValue::from_str(traceparent_value)?,
513 );
514 headers.insert(
515 reqwest::header::HeaderName::from_static("tracestate"),
516 reqwest::header::HeaderValue::from_str(tracestate_value)?,
517 );
518 let url = format!("http://{}{}", addr, path);
519 let response = client.get(&url).headers(headers).send().await.unwrap();
520 let status = response.status();
521 let body = response.text().await.unwrap();
522 tracing::info!(body = body, status = status.to_string());
523 }
524
525 Ok::<(), anyhow::Error>(())
526 })(),
527 )
528 .await;
529 Ok(())
530 }
531
532 #[tokio::test]
533 async fn test_health_endpoint_with_changing_health_status() {
534 const ENDPOINT_NAME: &str = "generate";
537 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
538 temp_env::async_with_vars(
539 [
540 ("DYN_SYSTEM_ENABLED", Some("true")),
541 ("DYN_SYSTEM_PORT", Some("0")),
542 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
543 ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
544 ],
545 async {
546 let drt = Arc::new(create_test_drt_async().await);
547
548 let system_info_opt = drt.system_status_server_info();
550
551 assert!(
553 system_info_opt.is_some(),
554 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
555 std::env::var("DYN_SYSTEM_ENABLED"),
556 std::env::var("DYN_SYSTEM_PORT")
557 );
558
559 let system_info = system_info_opt.unwrap();
561 let addr = system_info.socket_addr;
562
563 let client = reqwest::Client::new();
565 let health_url = format!("http://{}/health", addr);
566
567 let response = client.get(&health_url).send().await.unwrap();
568 let status = response.status();
569 let body = response.text().await.unwrap();
570
571 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
573 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
574
575 let namespace = drt.namespace("ns1234").unwrap();
577 let mut component = namespace.component("comp1234").unwrap();
578
579 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
581 use crate::protocols::annotated::Annotated;
582
583 struct TestHandler;
584
585 #[async_trait]
586 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
587 async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
588 let (data, ctx) = input.into_parts();
589 let response = Annotated::from_data(format!("You responded: {}", data));
590 Ok(crate::pipeline::ResponseStream::new(
591 Box::pin(crate::stream::iter(vec![response])),
592 ctx.context()
593 ))
594 }
595 }
596
597 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
599
600 tokio::spawn(async move {
603 component.add_stats_service().await.unwrap();
604 let _ = component.endpoint(ENDPOINT_NAME)
605 .endpoint_builder()
606 .handler(ingress)
607 .health_check_payload(serde_json::json!({
608 "test": "health_check"
609 }))
610 .start()
611 .await;
612 });
613
614 let mut success_count = 0;
616 let mut failures = Vec::new();
617
618 for i in 1..=200 {
619 let response = client.get(&health_url).send().await.unwrap();
620 let status = response.status();
621 let body = response.text().await.unwrap();
622
623 if status == 200 && body.contains("\"status\":\"ready\"") {
624 success_count += 1;
625 } else {
626 failures.push((i, status.as_u16(), body.clone()));
627 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
629 }
630 }
631 }
632
633 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
634 if !failures.is_empty() {
635 tracing::warn!("Failed requests: {}", failures.len());
636 }
637
638 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
640 },
641 )
642 .await;
643 }
644
645 #[tokio::test]
646 async fn test_spawn_system_status_server_endpoints() {
647 temp_env::async_with_vars(
649 [
650 ("DYN_SYSTEM_ENABLED", Some("true")),
651 ("DYN_SYSTEM_PORT", Some("0")),
652 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
653 ],
654 async {
655 let drt = Arc::new(create_test_drt_async().await);
656
657 let system_info = drt
659 .system_status_server_info()
660 .expect("System status server should be started by DRT");
661 let addr = system_info.socket_addr;
662 let client = reqwest::Client::new();
663 for (path, expect_200, expect_body) in [
664 ("/health", true, "ready"),
665 ("/live", true, "ready"),
666 ("/someRandomPathNotFoundHere", false, "Route not found"),
667 ] {
668 println!("[test] Sending request to {}", path);
669 let url = format!("http://{}{}", addr, path);
670 let response = client.get(&url).send().await.unwrap();
671 let status = response.status();
672 let body = response.text().await.unwrap();
673 println!(
674 "[test] Response for {}: status={}, body={:?}",
675 path, status, body
676 );
677 if expect_200 {
678 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
679 } else {
680 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
681 }
682 assert!(
683 body.contains(expect_body),
684 "Response: status={}, body={:?}",
685 status,
686 body
687 );
688 }
689 },
691 )
692 .await;
693 }
694
695 #[cfg(feature = "integration")]
696 #[tokio::test]
697 async fn test_health_check_with_payload_and_timeout() {
698 crate::logging::init();
700
701 temp_env::async_with_vars(
702 [
703 ("DYN_SYSTEM_ENABLED", Some("true")),
704 ("DYN_SYSTEM_PORT", Some("0")),
705 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
706 (
707 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
708 Some("[\"test.endpoint\"]"),
709 ),
710 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
712 ("DYN_CANARY_WAIT_TIME", Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
716 async {
717 let drt = Arc::new(create_test_drt_async().await);
718
719 let system_info = drt
721 .system_status_server_info()
722 .expect("System status server should be started");
723 let addr = system_info.socket_addr;
724
725 let client = reqwest::Client::new();
726 let health_url = format!("http://{}/health", addr);
727
728 let endpoint = "test.endpoint";
730 let health_check_payload = serde_json::json!({
731 "prompt": "health check test",
732 "_health_check": true
733 });
734
735 {
737 let system_health = drt.system_health.lock();
738 system_health.register_health_check_target(
739 endpoint,
740 crate::component::Instance {
741 component: "test_component".to_string(),
742 endpoint: "health".to_string(),
743 namespace: "test_namespace".to_string(),
744 instance_id: 1,
745 transport: crate::component::TransportType::NatsTcp(
746 endpoint.to_string(),
747 ),
748 },
749 health_check_payload.clone(),
750 );
751 }
752
753 let response = client.get(&health_url).send().await.unwrap();
755 let status = response.status();
756 let body = response.text().await.unwrap();
757 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
758 assert!(
759 body.contains("\"status\":\"notready\""),
760 "Should show notready status initially"
761 );
762
763 drt.system_health
765 .lock()
766 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
767
768 let response = client.get(&health_url).send().await.unwrap();
770 let status = response.status();
771 let body = response.text().await.unwrap();
772
773 assert_eq!(status, 200, "Should be healthy due to recent response");
774 assert!(
775 body.contains("\"status\":\"ready\""),
776 "Should show ready status after response"
777 );
778
779 let endpoint_status = drt
781 .system_health
782 .lock()
783 .get_endpoint_health_status(endpoint);
784 assert_eq!(
785 endpoint_status,
786 Some(HealthStatus::Ready),
787 "SystemHealth should show endpoint as Ready after response"
788 );
789 },
790 )
791 .await;
792 }
793}