1use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsRegistry;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21 pub socket_addr: std::net::SocketAddr,
22 pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27 Self {
28 socket_addr,
29 handle: handle.map(Arc::new),
30 }
31 }
32
33 pub fn address(&self) -> String {
34 self.socket_addr.to_string()
35 }
36
37 pub fn hostname(&self) -> String {
38 self.socket_addr.ip().to_string()
39 }
40
41 pub fn port(&self) -> u16 {
42 self.socket_addr.port()
43 }
44}
45
46impl Clone for SystemStatusServerInfo {
47 fn clone(&self) -> Self {
48 Self {
49 socket_addr: self.socket_addr,
50 handle: self.handle.clone(),
51 }
52 }
53}
54
55pub struct SystemStatusState {
57 root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62 pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64 Ok(Self { root_drt: drt })
65 }
66
67 pub fn drt(&self) -> &crate::DistributedRuntime {
69 &self.root_drt
70 }
71}
72
73pub async fn spawn_system_status_server(
75 host: &str,
76 port: u16,
77 cancel_token: CancellationToken,
78 drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80 let server_state = Arc::new(SystemStatusState::new(drt)?);
82 let health_path = server_state
83 .drt()
84 .system_health
85 .lock()
86 .unwrap()
87 .health_path()
88 .to_string();
89 let live_path = server_state
90 .drt()
91 .system_health
92 .lock()
93 .unwrap()
94 .live_path()
95 .to_string();
96
97 let app = Router::new()
98 .route(
99 &health_path,
100 get({
101 let state = Arc::clone(&server_state);
102 move || health_handler(state)
103 }),
104 )
105 .route(
106 &live_path,
107 get({
108 let state = Arc::clone(&server_state);
109 move || health_handler(state)
110 }),
111 )
112 .route(
113 "/metrics",
114 get({
115 let state = Arc::clone(&server_state);
116 move || metrics_handler(state)
117 }),
118 )
119 .fallback(|| async {
120 tracing::info!("[fallback handler] called");
121 (StatusCode::NOT_FOUND, "Route not found").into_response()
122 })
123 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
124
125 let address = format!("{}:{}", host, port);
126 tracing::info!("[spawn_system_status_server] binding to: {}", address);
127
128 let listener = match TcpListener::bind(&address).await {
129 Ok(listener) => {
130 let actual_address = listener.local_addr()?;
132 tracing::info!(
133 "[spawn_system_status_server] system status server bound to: {}",
134 actual_address
135 );
136 (listener, actual_address)
137 }
138 Err(e) => {
139 tracing::error!("Failed to bind to address {}: {}", address, e);
140 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
141 }
142 };
143 let (listener, actual_address) = listener;
144
145 let observer = cancel_token.child_token();
146 let handle = tokio::spawn(async move {
148 if let Err(e) = axum::serve(listener, app)
149 .with_graceful_shutdown(observer.cancelled_owned())
150 .await
151 {
152 tracing::error!("System status server error: {}", e);
153 }
154 });
155
156 Ok((actual_address, handle))
157}
158
159#[tracing::instrument(skip_all, level = "trace")]
161async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
162 let system_health = state.drt().system_health.lock().unwrap();
164 let (healthy, endpoints) = system_health.get_health_status();
165 let uptime = Some(system_health.uptime());
166
167 let healthy_string = if healthy { "ready" } else { "notready" };
168 let status_code = if healthy {
169 StatusCode::OK
170 } else {
171 StatusCode::SERVICE_UNAVAILABLE
172 };
173
174 let response = json!({
175 "status": healthy_string,
176 "uptime": uptime,
177 "endpoints": endpoints,
178 });
179
180 tracing::trace!("Response {}", response.to_string());
181
182 (status_code, response.to_string())
183}
184
185#[tracing::instrument(skip_all, level = "trace")]
187async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
188 state
190 .drt()
191 .system_health
192 .lock()
193 .unwrap()
194 .update_uptime_gauge();
195
196 assert!(state.drt().basename() == "");
198 let callback_results = state
199 .drt()
200 .execute_metrics_callbacks(&state.drt().hierarchy());
201 for result in callback_results {
202 if let Err(e) = result {
203 tracing::error!("Error executing metrics callback: {}", e);
204 }
205 }
206
207 match state.drt().prometheus_metrics_fmt() {
209 Ok(response) => (StatusCode::OK, response),
210 Err(e) => {
211 tracing::error!("Failed to get metrics from registry: {}", e);
212 (
213 StatusCode::INTERNAL_SERVER_ERROR,
214 "Failed to get metrics".to_string(),
215 )
216 }
217 }
218}
219
220#[cfg(test)]
222mod tests {
223 use super::*;
224 use tokio::time::Duration;
225
226 #[tokio::test]
228 async fn test_http_server_lifecycle() {
229 let cancel_token = CancellationToken::new();
230 let cancel_token_for_server = cancel_token.clone();
231
232 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
234
235 let server_handle = tokio::spawn(async move {
237 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
238 let _ = axum::serve(listener, app)
239 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
240 .await;
241 });
242
243 cancel_token.cancel();
247
248 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
250 assert!(
251 result.is_ok(),
252 "HTTP server should shut down when cancel token is cancelled"
253 );
254 }
255}
256
257#[cfg(all(test, feature = "integration"))]
259mod integration_tests {
260 use super::*;
261 use crate::distributed::distributed_test_utils::create_test_drt_async;
262 use crate::metrics::MetricsRegistry;
263 use anyhow::Result;
264 use rstest::rstest;
265 use std::sync::Arc;
266 use tokio::time::Duration;
267
268 #[tokio::test]
269 async fn test_uptime_from_system_health() {
270 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
272 let drt = create_test_drt_async().await;
273
274 let uptime = drt.system_health.lock().unwrap().uptime();
276 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
278
279 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
281 let uptime_after = drt.system_health.lock().unwrap().uptime();
282 assert!(uptime_after > uptime);
283 })
284 .await;
285 }
286
287 #[tokio::test]
288 async fn test_runtime_metrics_initialization_and_namespace() {
289 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
291 let drt = create_test_drt_async().await;
292 let response = drt.prometheus_metrics_fmt().unwrap();
297 println!("Full metrics response:\n{}", response);
298
299 let filtered_response: String = response
301 .lines()
302 .filter(|line| {
303 !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
304 })
305 .collect::<Vec<_>>()
306 .join("\n");
307
308 assert!(
310 filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
311 "Should contain uptime_seconds help text"
312 );
313 assert!(
314 filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
315 "Should contain uptime_seconds type"
316 );
317 assert!(
318 filtered_response.contains("dynamo_component_uptime_seconds"),
319 "Should contain uptime_seconds metric with correct namespace"
320 );
321 })
322 .await;
323 }
324
325 #[tokio::test]
326 async fn test_uptime_gauge_updates() {
327 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
329 let drt = create_test_drt_async().await;
330
331 let initial_uptime = drt.system_health.lock().unwrap().uptime();
333
334 drt.system_health.lock().unwrap().update_uptime_gauge();
336
337 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
339
340 let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
342
343 drt.system_health.lock().unwrap().update_uptime_gauge();
345
346 let elapsed = uptime_after_sleep - initial_uptime;
348 assert!(
349 elapsed >= std::time::Duration::from_millis(100),
350 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
351 elapsed
352 );
353 })
354 .await;
355 }
356
357 #[tokio::test]
358 async fn test_http_requests_fail_when_system_disabled() {
359 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
361 let drt = create_test_drt_async().await;
362
363 let system_info = drt.system_status_server_info();
365 assert!(
366 system_info.is_none(),
367 "System status server should not be running when DYN_SYSTEM_ENABLED=false"
368 );
369
370 println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
371 })
372 .await;
373 }
374
375 #[rstest]
382 #[case("ready", 200, "ready", None, None, 3)]
383 #[case("notready", 503, "notready", None, None, 3)]
384 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
385 #[case(
386 "notready",
387 503,
388 "notready",
389 Some("/custom/health"),
390 Some("/custom/live"),
391 5
392 )]
393 #[tokio::test]
394 #[cfg(feature = "integration")]
395 async fn test_health_endpoints(
396 #[case] starting_health_status: &'static str,
397 #[case] expected_status: u16,
398 #[case] expected_body: &'static str,
399 #[case] custom_health_path: Option<&'static str>,
400 #[case] custom_live_path: Option<&'static str>,
401 #[case] expected_num_tests: usize,
402 ) {
403 use std::sync::Arc;
404 crate::logging::init();
410
411 #[allow(clippy::redundant_closure_call)]
412 temp_env::async_with_vars(
413 [
414 ("DYN_SYSTEM_ENABLED", Some("true")),
415 ("DYN_SYSTEM_PORT", Some("0")),
416 (
417 "DYN_SYSTEM_STARTING_HEALTH_STATUS",
418 Some(starting_health_status),
419 ),
420 ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
421 ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
422 ],
423 (async || {
424 let drt = Arc::new(create_test_drt_async().await);
425
426 let system_info = drt
428 .system_status_server_info()
429 .expect("System status server should be started by DRT");
430 let addr = system_info.socket_addr;
431
432 let client = reqwest::Client::new();
433
434 let mut test_cases = vec![];
436 match custom_health_path {
437 None => {
438 test_cases.push(("/health", expected_status, expected_body));
440 }
441 Some(chp) => {
442 test_cases.push(("/health", 404, "Route not found"));
444 test_cases.push((chp, expected_status, expected_body));
445 }
446 }
447 match custom_live_path {
448 None => {
449 test_cases.push(("/live", expected_status, expected_body));
451 }
452 Some(clp) => {
453 test_cases.push(("/live", 404, "Route not found"));
455 test_cases.push((clp, expected_status, expected_body));
456 }
457 }
458 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
459 assert_eq!(test_cases.len(), expected_num_tests);
460
461 for (path, expect_status, expect_body) in test_cases {
462 println!("[test] Sending request to {}", path);
463 let url = format!("http://{}{}", addr, path);
464 let response = client.get(&url).send().await.unwrap();
465 let status = response.status();
466 let body = response.text().await.unwrap();
467 println!(
468 "[test] Response for {}: status={}, body={:?}",
469 path, status, body
470 );
471 assert_eq!(
472 status, expect_status,
473 "Response: status={}, body={:?}",
474 status, body
475 );
476 assert!(
477 body.contains(expect_body),
478 "Response: status={}, body={:?}",
479 status,
480 body
481 );
482 }
483 })(),
484 )
485 .await;
486 }
487
488 #[tokio::test]
489 async fn test_health_endpoint_tracing() -> Result<()> {
490 use std::sync::Arc;
491
492 #[allow(clippy::redundant_closure_call)]
495 let _ = temp_env::async_with_vars(
496 [
497 ("DYN_SYSTEM_ENABLED", Some("true")),
498 ("DYN_SYSTEM_PORT", Some("0")),
499 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
500 ("DYN_LOGGING_JSONL", Some("1")),
501 ("DYN_LOG", Some("trace")),
502 ],
503 (async || {
504 crate::logging::init();
508
509 let drt = Arc::new(create_test_drt_async().await);
510
511 let system_info = drt
513 .system_status_server_info()
514 .expect("System status server should be started by DRT");
515 let addr = system_info.socket_addr;
516 let client = reqwest::Client::new();
517 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
518 let traceparent_value =
519 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
520 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
521 let mut headers = reqwest::header::HeaderMap::new();
522 headers.insert(
523 reqwest::header::HeaderName::from_static("traceparent"),
524 reqwest::header::HeaderValue::from_str(traceparent_value)?,
525 );
526 headers.insert(
527 reqwest::header::HeaderName::from_static("tracestate"),
528 reqwest::header::HeaderValue::from_str(tracestate_value)?,
529 );
530 let url = format!("http://{}{}", addr, path);
531 let response = client.get(&url).headers(headers).send().await.unwrap();
532 let status = response.status();
533 let body = response.text().await.unwrap();
534 tracing::info!(body = body, status = status.to_string());
535 }
536
537 Ok::<(), anyhow::Error>(())
538 })(),
539 )
540 .await;
541 Ok(())
542 }
543
544 #[tokio::test]
545 async fn test_health_endpoint_with_changing_health_status() {
546 const ENDPOINT_NAME: &str = "generate";
549 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
550 temp_env::async_with_vars(
551 [
552 ("DYN_SYSTEM_ENABLED", Some("true")),
553 ("DYN_SYSTEM_PORT", Some("0")),
554 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
555 ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
556 ],
557 async {
558 let drt = Arc::new(create_test_drt_async().await);
559
560 let system_info_opt = drt.system_status_server_info();
562
563 assert!(
565 system_info_opt.is_some(),
566 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
567 std::env::var("DYN_SYSTEM_ENABLED"),
568 std::env::var("DYN_SYSTEM_PORT")
569 );
570
571 let system_info = system_info_opt.unwrap();
573 let addr = system_info.socket_addr;
574
575 let client = reqwest::Client::new();
577 let health_url = format!("http://{}/health", addr);
578
579 let response = client.get(&health_url).send().await.unwrap();
580 let status = response.status();
581 let body = response.text().await.unwrap();
582
583 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
585 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
586
587 let namespace = drt.namespace("ns1234").unwrap();
589 let component = namespace.component("comp1234").unwrap();
590
591 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
593 use crate::protocols::annotated::Annotated;
594
595 struct TestHandler;
596
597 #[async_trait]
598 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
599 async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
600 let (data, ctx) = input.into_parts();
601 let response = Annotated::from_data(format!("You responded: {}", data));
602 Ok(crate::pipeline::ResponseStream::new(
603 Box::pin(crate::stream::iter(vec![response])),
604 ctx.context()
605 ))
606 }
607 }
608
609 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
611
612 tokio::spawn(async move {
615 let _ = component
616 .service_builder()
617 .create()
618 .await
619 .unwrap()
620 .endpoint(ENDPOINT_NAME)
621 .endpoint_builder()
622 .handler(ingress)
623 .health_check_payload(serde_json::json!({
624 "test": "health_check"
625 }))
626 .start()
627 .await;
628 });
629
630 let mut success_count = 0;
632 let mut failures = Vec::new();
633
634 for i in 1..=200 {
635 let response = client.get(&health_url).send().await.unwrap();
636 let status = response.status();
637 let body = response.text().await.unwrap();
638
639 if status == 200 && body.contains("\"status\":\"ready\"") {
640 success_count += 1;
641 } else {
642 failures.push((i, status.as_u16(), body.clone()));
643 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
645 }
646 }
647 }
648
649 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
650 if !failures.is_empty() {
651 tracing::warn!("Failed requests: {}", failures.len());
652 }
653
654 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
656 },
657 )
658 .await;
659 }
660
661 #[tokio::test]
662 async fn test_spawn_system_status_server_endpoints() {
663 temp_env::async_with_vars(
665 [
666 ("DYN_SYSTEM_ENABLED", Some("true")),
667 ("DYN_SYSTEM_PORT", Some("0")),
668 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
669 ],
670 async {
671 let drt = Arc::new(create_test_drt_async().await);
672
673 let system_info = drt
675 .system_status_server_info()
676 .expect("System status server should be started by DRT");
677 let addr = system_info.socket_addr;
678 let client = reqwest::Client::new();
679 for (path, expect_200, expect_body) in [
680 ("/health", true, "ready"),
681 ("/live", true, "ready"),
682 ("/someRandomPathNotFoundHere", false, "Route not found"),
683 ] {
684 println!("[test] Sending request to {}", path);
685 let url = format!("http://{}{}", addr, path);
686 let response = client.get(&url).send().await.unwrap();
687 let status = response.status();
688 let body = response.text().await.unwrap();
689 println!(
690 "[test] Response for {}: status={}, body={:?}",
691 path, status, body
692 );
693 if expect_200 {
694 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
695 } else {
696 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
697 }
698 assert!(
699 body.contains(expect_body),
700 "Response: status={}, body={:?}",
701 status,
702 body
703 );
704 }
705 },
707 )
708 .await;
709 }
710
711 #[cfg(feature = "integration")]
712 #[tokio::test]
713 async fn test_health_check_with_payload_and_timeout() {
714 crate::logging::init();
716
717 temp_env::async_with_vars(
718 [
719 ("DYN_SYSTEM_ENABLED", Some("true")),
720 ("DYN_SYSTEM_PORT", Some("0")),
721 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
722 (
723 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
724 Some("[\"test.endpoint\"]"),
725 ),
726 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
728 ("DYN_CANARY_WAIT_TIME", Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
732 async {
733 let drt = Arc::new(create_test_drt_async().await);
734
735 let system_info = drt
737 .system_status_server_info()
738 .expect("System status server should be started");
739 let addr = system_info.socket_addr;
740
741 let client = reqwest::Client::new();
742 let health_url = format!("http://{}/health", addr);
743
744 let endpoint = "test.endpoint";
746 let health_check_payload = serde_json::json!({
747 "prompt": "health check test",
748 "_health_check": true
749 });
750
751 {
753 let system_health = drt.system_health.lock().unwrap();
754 system_health.register_health_check_target(
755 endpoint,
756 crate::component::Instance {
757 component: "test_component".to_string(),
758 endpoint: "health".to_string(),
759 namespace: "test_namespace".to_string(),
760 instance_id: 1,
761 transport: crate::component::TransportType::NatsTcp(
762 endpoint.to_string(),
763 ),
764 },
765 health_check_payload.clone(),
766 );
767 }
768
769 let response = client.get(&health_url).send().await.unwrap();
771 let status = response.status();
772 let body = response.text().await.unwrap();
773 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
774 assert!(
775 body.contains("\"status\":\"notready\""),
776 "Should show notready status initially"
777 );
778
779 drt.system_health
781 .lock()
782 .unwrap()
783 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
784
785 let response = client.get(&health_url).send().await.unwrap();
787 let status = response.status();
788 let body = response.text().await.unwrap();
789
790 assert_eq!(status, 200, "Should be healthy due to recent response");
791 assert!(
792 body.contains("\"status\":\"ready\""),
793 "Should show ready status after response"
794 );
795
796 let endpoint_status = drt
798 .system_health
799 .lock()
800 .unwrap()
801 .get_endpoint_health_status(endpoint);
802 assert_eq!(
803 endpoint_status,
804 Some(HealthStatus::Ready),
805 "SystemHealth should show endpoint as Ready after response"
806 );
807 },
808 )
809 .await;
810 }
811}