1use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsRegistry;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21 pub socket_addr: std::net::SocketAddr,
22 pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27 Self {
28 socket_addr,
29 handle: handle.map(Arc::new),
30 }
31 }
32
33 pub fn address(&self) -> String {
34 self.socket_addr.to_string()
35 }
36
37 pub fn hostname(&self) -> String {
38 self.socket_addr.ip().to_string()
39 }
40
41 pub fn port(&self) -> u16 {
42 self.socket_addr.port()
43 }
44}
45
46impl Clone for SystemStatusServerInfo {
47 fn clone(&self) -> Self {
48 Self {
49 socket_addr: self.socket_addr,
50 handle: self.handle.clone(),
51 }
52 }
53}
54
55pub struct SystemStatusState {
57 root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62 pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64 Ok(Self { root_drt: drt })
65 }
66
67 pub fn drt(&self) -> &crate::DistributedRuntime {
69 &self.root_drt
70 }
71}
72
73pub async fn spawn_system_status_server(
75 host: &str,
76 port: u16,
77 cancel_token: CancellationToken,
78 drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80 let server_state = Arc::new(SystemStatusState::new(drt)?);
82 let health_path = server_state
83 .drt()
84 .system_health
85 .lock()
86 .unwrap()
87 .health_path()
88 .to_string();
89 let live_path = server_state
90 .drt()
91 .system_health
92 .lock()
93 .unwrap()
94 .live_path()
95 .to_string();
96
97 let app = Router::new()
98 .route(
99 &health_path,
100 get({
101 let state = Arc::clone(&server_state);
102 move || health_handler(state)
103 }),
104 )
105 .route(
106 &live_path,
107 get({
108 let state = Arc::clone(&server_state);
109 move || health_handler(state)
110 }),
111 )
112 .route(
113 "/metrics",
114 get({
115 let state = Arc::clone(&server_state);
116 move || metrics_handler(state)
117 }),
118 )
119 .fallback(|| async {
120 tracing::info!("[fallback handler] called");
121 (StatusCode::NOT_FOUND, "Route not found").into_response()
122 })
123 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
124
125 let address = format!("{}:{}", host, port);
126 tracing::info!("[spawn_system_status_server] binding to: {}", address);
127
128 let listener = match TcpListener::bind(&address).await {
129 Ok(listener) => {
130 let actual_address = listener.local_addr()?;
132 tracing::info!(
133 "[spawn_system_status_server] system status server bound to: {}",
134 actual_address
135 );
136 (listener, actual_address)
137 }
138 Err(e) => {
139 tracing::error!("Failed to bind to address {}: {}", address, e);
140 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
141 }
142 };
143 let (listener, actual_address) = listener;
144
145 let observer = cancel_token.child_token();
146 let handle = tokio::spawn(async move {
148 if let Err(e) = axum::serve(listener, app)
149 .with_graceful_shutdown(observer.cancelled_owned())
150 .await
151 {
152 tracing::error!("System status server error: {}", e);
153 }
154 });
155
156 Ok((actual_address, handle))
157}
158
159#[tracing::instrument(skip_all, level = "trace")]
161async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
162 let system_health = state.drt().system_health.lock().unwrap();
164 let (healthy, endpoints) = system_health.get_health_status();
165 let uptime = Some(system_health.uptime());
166
167 let healthy_string = if healthy { "ready" } else { "notready" };
168 let status_code = if healthy {
169 StatusCode::OK
170 } else {
171 StatusCode::SERVICE_UNAVAILABLE
172 };
173
174 let response = json!({
175 "status": healthy_string,
176 "uptime": uptime,
177 "endpoints": endpoints,
178 });
179
180 tracing::trace!("Response {}", response.to_string());
181
182 (status_code, response.to_string())
183}
184
185#[tracing::instrument(skip_all, level = "trace")]
187async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
188 state
190 .drt()
191 .system_health
192 .lock()
193 .unwrap()
194 .update_uptime_gauge();
195
196 let all_hierarchies: Vec<String> = {
198 let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
199 registries.keys().cloned().collect()
200 };
201
202 for hierarchy in &all_hierarchies {
203 let callback_results = state.drt().execute_prometheus_update_callbacks(hierarchy);
204 for result in callback_results {
205 if let Err(e) = result {
206 tracing::error!(
207 "Error executing metrics callback for hierarchy '{}': {}",
208 hierarchy,
209 e
210 );
211 }
212 }
213 }
214
215 let mut response = match state.drt().prometheus_expfmt() {
217 Ok(r) => r,
218 Err(e) => {
219 tracing::error!("Failed to get metrics from registry: {}", e);
220 return (
221 StatusCode::INTERNAL_SERVER_ERROR,
222 "Failed to get metrics".to_string(),
223 );
224 }
225 };
226
227 for hierarchy in &all_hierarchies {
229 let expfmt = {
230 let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
231 if let Some(entry) = registries.get(hierarchy) {
232 entry.execute_prometheus_expfmt_callbacks()
233 } else {
234 String::new()
235 }
236 };
237
238 if !expfmt.is_empty() {
239 if !response.ends_with('\n') {
240 response.push('\n');
241 }
242 response.push_str(&expfmt);
243 }
244 }
245
246 (StatusCode::OK, response)
247}
248
249#[cfg(test)]
251mod tests {
252 use super::*;
253 use tokio::time::Duration;
254
255 #[tokio::test]
257 async fn test_http_server_lifecycle() {
258 let cancel_token = CancellationToken::new();
259 let cancel_token_for_server = cancel_token.clone();
260
261 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
263
264 let server_handle = tokio::spawn(async move {
266 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
267 let _ = axum::serve(listener, app)
268 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
269 .await;
270 });
271
272 cancel_token.cancel();
276
277 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
279 assert!(
280 result.is_ok(),
281 "HTTP server should shut down when cancel token is cancelled"
282 );
283 }
284}
285
286#[cfg(all(test, feature = "integration"))]
288mod integration_tests {
289 use super::*;
290 use crate::distributed::distributed_test_utils::create_test_drt_async;
291 use crate::metrics::MetricsRegistry;
292 use anyhow::Result;
293 use rstest::rstest;
294 use std::sync::Arc;
295 use tokio::time::Duration;
296
297 #[tokio::test]
298 async fn test_uptime_from_system_health() {
299 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
301 let drt = create_test_drt_async().await;
302
303 let uptime = drt.system_health.lock().unwrap().uptime();
305 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
307
308 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
310 let uptime_after = drt.system_health.lock().unwrap().uptime();
311 assert!(uptime_after > uptime);
312 })
313 .await;
314 }
315
316 #[tokio::test]
317 async fn test_runtime_metrics_initialization_and_namespace() {
318 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
320 let drt = create_test_drt_async().await;
321 let response = drt.prometheus_expfmt().unwrap();
326 println!("Full metrics response:\n{}", response);
327
328 let filtered_response: String = response
330 .lines()
331 .filter(|line| {
332 !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
333 })
334 .collect::<Vec<_>>()
335 .join("\n");
336
337 assert!(
339 filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
340 "Should contain uptime_seconds help text"
341 );
342 assert!(
343 filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
344 "Should contain uptime_seconds type"
345 );
346 assert!(
347 filtered_response.contains("dynamo_component_uptime_seconds"),
348 "Should contain uptime_seconds metric with correct namespace"
349 );
350 })
351 .await;
352 }
353
354 #[tokio::test]
355 async fn test_uptime_gauge_updates() {
356 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
358 let drt = create_test_drt_async().await;
359
360 let initial_uptime = drt.system_health.lock().unwrap().uptime();
362
363 drt.system_health.lock().unwrap().update_uptime_gauge();
365
366 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
368
369 let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
371
372 drt.system_health.lock().unwrap().update_uptime_gauge();
374
375 let elapsed = uptime_after_sleep - initial_uptime;
377 assert!(
378 elapsed >= std::time::Duration::from_millis(100),
379 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
380 elapsed
381 );
382 })
383 .await;
384 }
385
386 #[tokio::test]
387 async fn test_http_requests_fail_when_system_disabled() {
388 temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
390 let drt = create_test_drt_async().await;
391
392 let system_info = drt.system_status_server_info();
394 assert!(
395 system_info.is_none(),
396 "System status server should not be running when DYN_SYSTEM_ENABLED=false"
397 );
398
399 println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
400 })
401 .await;
402 }
403
404 #[rstest]
411 #[case("ready", 200, "ready", None, None, 3)]
412 #[case("notready", 503, "notready", None, None, 3)]
413 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
414 #[case(
415 "notready",
416 503,
417 "notready",
418 Some("/custom/health"),
419 Some("/custom/live"),
420 5
421 )]
422 #[tokio::test]
423 #[cfg(feature = "integration")]
424 async fn test_health_endpoints(
425 #[case] starting_health_status: &'static str,
426 #[case] expected_status: u16,
427 #[case] expected_body: &'static str,
428 #[case] custom_health_path: Option<&'static str>,
429 #[case] custom_live_path: Option<&'static str>,
430 #[case] expected_num_tests: usize,
431 ) {
432 use std::sync::Arc;
433 crate::logging::init();
439
440 #[allow(clippy::redundant_closure_call)]
441 temp_env::async_with_vars(
442 [
443 ("DYN_SYSTEM_ENABLED", Some("true")),
444 ("DYN_SYSTEM_PORT", Some("0")),
445 (
446 "DYN_SYSTEM_STARTING_HEALTH_STATUS",
447 Some(starting_health_status),
448 ),
449 ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
450 ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
451 ],
452 (async || {
453 let drt = Arc::new(create_test_drt_async().await);
454
455 let system_info = drt
457 .system_status_server_info()
458 .expect("System status server should be started by DRT");
459 let addr = system_info.socket_addr;
460
461 let client = reqwest::Client::new();
462
463 let mut test_cases = vec![];
465 match custom_health_path {
466 None => {
467 test_cases.push(("/health", expected_status, expected_body));
469 }
470 Some(chp) => {
471 test_cases.push(("/health", 404, "Route not found"));
473 test_cases.push((chp, expected_status, expected_body));
474 }
475 }
476 match custom_live_path {
477 None => {
478 test_cases.push(("/live", expected_status, expected_body));
480 }
481 Some(clp) => {
482 test_cases.push(("/live", 404, "Route not found"));
484 test_cases.push((clp, expected_status, expected_body));
485 }
486 }
487 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
488 assert_eq!(test_cases.len(), expected_num_tests);
489
490 for (path, expect_status, expect_body) in test_cases {
491 println!("[test] Sending request to {}", path);
492 let url = format!("http://{}{}", addr, path);
493 let response = client.get(&url).send().await.unwrap();
494 let status = response.status();
495 let body = response.text().await.unwrap();
496 println!(
497 "[test] Response for {}: status={}, body={:?}",
498 path, status, body
499 );
500 assert_eq!(
501 status, expect_status,
502 "Response: status={}, body={:?}",
503 status, body
504 );
505 assert!(
506 body.contains(expect_body),
507 "Response: status={}, body={:?}",
508 status,
509 body
510 );
511 }
512 })(),
513 )
514 .await;
515 }
516
517 #[tokio::test]
518 async fn test_health_endpoint_tracing() -> Result<()> {
519 use std::sync::Arc;
520
521 #[allow(clippy::redundant_closure_call)]
524 let _ = temp_env::async_with_vars(
525 [
526 ("DYN_SYSTEM_ENABLED", Some("true")),
527 ("DYN_SYSTEM_PORT", Some("0")),
528 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
529 ("DYN_LOGGING_JSONL", Some("1")),
530 ("DYN_LOG", Some("trace")),
531 ],
532 (async || {
533 crate::logging::init();
537
538 let drt = Arc::new(create_test_drt_async().await);
539
540 let system_info = drt
542 .system_status_server_info()
543 .expect("System status server should be started by DRT");
544 let addr = system_info.socket_addr;
545 let client = reqwest::Client::new();
546 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
547 let traceparent_value =
548 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
549 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
550 let mut headers = reqwest::header::HeaderMap::new();
551 headers.insert(
552 reqwest::header::HeaderName::from_static("traceparent"),
553 reqwest::header::HeaderValue::from_str(traceparent_value)?,
554 );
555 headers.insert(
556 reqwest::header::HeaderName::from_static("tracestate"),
557 reqwest::header::HeaderValue::from_str(tracestate_value)?,
558 );
559 let url = format!("http://{}{}", addr, path);
560 let response = client.get(&url).headers(headers).send().await.unwrap();
561 let status = response.status();
562 let body = response.text().await.unwrap();
563 tracing::info!(body = body, status = status.to_string());
564 }
565
566 Ok::<(), anyhow::Error>(())
567 })(),
568 )
569 .await;
570 Ok(())
571 }
572
573 #[tokio::test]
574 async fn test_health_endpoint_with_changing_health_status() {
575 const ENDPOINT_NAME: &str = "generate";
578 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
579 temp_env::async_with_vars(
580 [
581 ("DYN_SYSTEM_ENABLED", Some("true")),
582 ("DYN_SYSTEM_PORT", Some("0")),
583 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
584 ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
585 ],
586 async {
587 let drt = Arc::new(create_test_drt_async().await);
588
589 let system_info_opt = drt.system_status_server_info();
591
592 assert!(
594 system_info_opt.is_some(),
595 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
596 std::env::var("DYN_SYSTEM_ENABLED"),
597 std::env::var("DYN_SYSTEM_PORT")
598 );
599
600 let system_info = system_info_opt.unwrap();
602 let addr = system_info.socket_addr;
603
604 let client = reqwest::Client::new();
606 let health_url = format!("http://{}/health", addr);
607
608 let response = client.get(&health_url).send().await.unwrap();
609 let status = response.status();
610 let body = response.text().await.unwrap();
611
612 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
614 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
615
616 let namespace = drt.namespace("ns1234").unwrap();
618 let component = namespace.component("comp1234").unwrap();
619
620 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
622 use crate::protocols::annotated::Annotated;
623
624 struct TestHandler;
625
626 #[async_trait]
627 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
628 async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
629 let (data, ctx) = input.into_parts();
630 let response = Annotated::from_data(format!("You responded: {}", data));
631 Ok(crate::pipeline::ResponseStream::new(
632 Box::pin(crate::stream::iter(vec![response])),
633 ctx.context()
634 ))
635 }
636 }
637
638 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
640
641 tokio::spawn(async move {
644 let _ = component
645 .service_builder()
646 .create()
647 .await
648 .unwrap()
649 .endpoint(ENDPOINT_NAME)
650 .endpoint_builder()
651 .handler(ingress)
652 .health_check_payload(serde_json::json!({
653 "test": "health_check"
654 }))
655 .start()
656 .await;
657 });
658
659 let mut success_count = 0;
661 let mut failures = Vec::new();
662
663 for i in 1..=200 {
664 let response = client.get(&health_url).send().await.unwrap();
665 let status = response.status();
666 let body = response.text().await.unwrap();
667
668 if status == 200 && body.contains("\"status\":\"ready\"") {
669 success_count += 1;
670 } else {
671 failures.push((i, status.as_u16(), body.clone()));
672 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
674 }
675 }
676 }
677
678 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
679 if !failures.is_empty() {
680 tracing::warn!("Failed requests: {}", failures.len());
681 }
682
683 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
685 },
686 )
687 .await;
688 }
689
690 #[tokio::test]
691 async fn test_spawn_system_status_server_endpoints() {
692 temp_env::async_with_vars(
694 [
695 ("DYN_SYSTEM_ENABLED", Some("true")),
696 ("DYN_SYSTEM_PORT", Some("0")),
697 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
698 ],
699 async {
700 let drt = Arc::new(create_test_drt_async().await);
701
702 let system_info = drt
704 .system_status_server_info()
705 .expect("System status server should be started by DRT");
706 let addr = system_info.socket_addr;
707 let client = reqwest::Client::new();
708 for (path, expect_200, expect_body) in [
709 ("/health", true, "ready"),
710 ("/live", true, "ready"),
711 ("/someRandomPathNotFoundHere", false, "Route not found"),
712 ] {
713 println!("[test] Sending request to {}", path);
714 let url = format!("http://{}{}", addr, path);
715 let response = client.get(&url).send().await.unwrap();
716 let status = response.status();
717 let body = response.text().await.unwrap();
718 println!(
719 "[test] Response for {}: status={}, body={:?}",
720 path, status, body
721 );
722 if expect_200 {
723 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
724 } else {
725 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
726 }
727 assert!(
728 body.contains(expect_body),
729 "Response: status={}, body={:?}",
730 status,
731 body
732 );
733 }
734 },
736 )
737 .await;
738 }
739
740 #[cfg(feature = "integration")]
741 #[tokio::test]
742 async fn test_health_check_with_payload_and_timeout() {
743 crate::logging::init();
745
746 temp_env::async_with_vars(
747 [
748 ("DYN_SYSTEM_ENABLED", Some("true")),
749 ("DYN_SYSTEM_PORT", Some("0")),
750 ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
751 (
752 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
753 Some("[\"test.endpoint\"]"),
754 ),
755 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
757 ("DYN_CANARY_WAIT_TIME", Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
761 async {
762 let drt = Arc::new(create_test_drt_async().await);
763
764 let system_info = drt
766 .system_status_server_info()
767 .expect("System status server should be started");
768 let addr = system_info.socket_addr;
769
770 let client = reqwest::Client::new();
771 let health_url = format!("http://{}/health", addr);
772
773 let endpoint = "test.endpoint";
775 let health_check_payload = serde_json::json!({
776 "prompt": "health check test",
777 "_health_check": true
778 });
779
780 {
782 let system_health = drt.system_health.lock().unwrap();
783 system_health.register_health_check_target(
784 endpoint,
785 crate::component::Instance {
786 component: "test_component".to_string(),
787 endpoint: "health".to_string(),
788 namespace: "test_namespace".to_string(),
789 instance_id: 1,
790 transport: crate::component::TransportType::NatsTcp(
791 endpoint.to_string(),
792 ),
793 },
794 health_check_payload.clone(),
795 );
796 }
797
798 let response = client.get(&health_url).send().await.unwrap();
800 let status = response.status();
801 let body = response.text().await.unwrap();
802 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
803 assert!(
804 body.contains("\"status\":\"notready\""),
805 "Should show notready status initially"
806 );
807
808 drt.system_health
810 .lock()
811 .unwrap()
812 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
813
814 let response = client.get(&health_url).send().await.unwrap();
816 let status = response.status();
817 let body = response.text().await.unwrap();
818
819 assert_eq!(status, 200, "Should be healthy due to recent response");
820 assert!(
821 body.contains("\"status\":\"ready\""),
822 "Should show ready status after response"
823 );
824
825 let endpoint_status = drt
827 .system_health
828 .lock()
829 .unwrap()
830 .get_endpoint_health_status(endpoint);
831 assert_eq!(
832 endpoint_status,
833 Some(HealthStatus::Ready),
834 "SystemHealth should show endpoint as Ready after response"
835 );
836 },
837 )
838 .await;
839 }
840}