dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsHierarchy;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18/// System status server information containing socket address and handle
19#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21    pub socket_addr: std::net::SocketAddr,
22    pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27        Self {
28            socket_addr,
29            handle: handle.map(Arc::new),
30        }
31    }
32
33    pub fn address(&self) -> String {
34        self.socket_addr.to_string()
35    }
36
37    pub fn hostname(&self) -> String {
38        self.socket_addr.ip().to_string()
39    }
40
41    pub fn port(&self) -> u16 {
42        self.socket_addr.port()
43    }
44}
45
46impl Clone for SystemStatusServerInfo {
47    fn clone(&self) -> Self {
48        Self {
49            socket_addr: self.socket_addr,
50            handle: self.handle.clone(),
51        }
52    }
53}
54
55/// System status server state containing the distributed runtime reference
56pub struct SystemStatusState {
57    // global drt registry is for printing out the entire Prometheus format output
58    root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62    /// Create new system status server state with the provided distributed runtime
63    pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64        Ok(Self { root_drt: drt })
65    }
66
67    /// Get a reference to the distributed runtime
68    pub fn drt(&self) -> &crate::DistributedRuntime {
69        &self.root_drt
70    }
71}
72
73/// Start system status server with metrics support
74pub async fn spawn_system_status_server(
75    host: &str,
76    port: u16,
77    cancel_token: CancellationToken,
78    drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80    // Create system status server state with the provided distributed runtime
81    let server_state = Arc::new(SystemStatusState::new(drt)?);
82    let health_path = server_state
83        .drt()
84        .system_health
85        .lock()
86        .health_path()
87        .to_string();
88    let live_path = server_state
89        .drt()
90        .system_health
91        .lock()
92        .live_path()
93        .to_string();
94
95    let app = Router::new()
96        .route(
97            &health_path,
98            get({
99                let state = Arc::clone(&server_state);
100                move || health_handler(state)
101            }),
102        )
103        .route(
104            &live_path,
105            get({
106                let state = Arc::clone(&server_state);
107                move || health_handler(state)
108            }),
109        )
110        .route(
111            "/metrics",
112            get({
113                let state = Arc::clone(&server_state);
114                move || metrics_handler(state)
115            }),
116        )
117        .fallback(|| async {
118            tracing::info!("[fallback handler] called");
119            (StatusCode::NOT_FOUND, "Route not found").into_response()
120        })
121        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
122
123    let address = format!("{}:{}", host, port);
124    tracing::info!("[spawn_system_status_server] binding to: {}", address);
125
126    let listener = match TcpListener::bind(&address).await {
127        Ok(listener) => {
128            // get the actual address and port, print in debug level
129            let actual_address = listener.local_addr()?;
130            tracing::info!(
131                "[spawn_system_status_server] system status server bound to: {}",
132                actual_address
133            );
134            (listener, actual_address)
135        }
136        Err(e) => {
137            tracing::error!("Failed to bind to address {}: {}", address, e);
138            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
139        }
140    };
141    let (listener, actual_address) = listener;
142
143    let observer = cancel_token.child_token();
144    // Spawn the server in the background and return the handle
145    let handle = tokio::spawn(async move {
146        if let Err(e) = axum::serve(listener, app)
147            .with_graceful_shutdown(observer.cancelled_owned())
148            .await
149        {
150            tracing::error!("System status server error: {}", e);
151        }
152    });
153
154    Ok((actual_address, handle))
155}
156
157/// Health handler with optional active health checking
158#[tracing::instrument(skip_all, level = "trace")]
159async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
160    // Get basic health status
161    let system_health = state.drt().system_health.lock();
162    let (healthy, endpoints) = system_health.get_health_status();
163    let uptime = Some(system_health.uptime());
164
165    let healthy_string = if healthy { "ready" } else { "notready" };
166    let status_code = if healthy {
167        StatusCode::OK
168    } else {
169        StatusCode::SERVICE_UNAVAILABLE
170    };
171
172    let response = json!({
173        "status": healthy_string,
174        "uptime": uptime,
175        "endpoints": endpoints,
176    });
177
178    tracing::trace!("Response {}", response.to_string());
179
180    (status_code, response.to_string())
181}
182
183/// Metrics handler with DistributedRuntime uptime
184#[tracing::instrument(skip_all, level = "trace")]
185async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
186    // Update the uptime gauge with current value
187    state.drt().system_health.lock().update_uptime_gauge();
188
189    // Get all metrics from DistributedRuntime
190    // Note: In the new hierarchy-based architecture, metrics are automatically registered
191    // at all parent levels, so DRT's metrics include all metrics from children
192    // (Namespace, Component, Endpoint). The prometheus_expfmt() method also executes
193    // all update callbacks and expfmt callbacks before returning the metrics.
194    let response = match state.drt().metrics().prometheus_expfmt() {
195        Ok(r) => r,
196        Err(e) => {
197            tracing::error!("Failed to get metrics from registry: {}", e);
198            return (
199                StatusCode::INTERNAL_SERVER_ERROR,
200                "Failed to get metrics".to_string(),
201            );
202        }
203    };
204
205    (StatusCode::OK, response)
206}
207
208// Regular tests: cargo test system_status_server --lib
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use tokio::time::Duration;
213
214    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
215    #[tokio::test]
216    async fn test_http_server_lifecycle() {
217        let cancel_token = CancellationToken::new();
218        let cancel_token_for_server = cancel_token.clone();
219
220        // Test basic HTTP server lifecycle without DistributedRuntime
221        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
222
223        // start HTTP server
224        let server_handle = tokio::spawn(async move {
225            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
226            let _ = axum::serve(listener, app)
227                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
228                .await;
229        });
230
231        // server starts immediately, no need to wait
232
233        // cancel token
234        cancel_token.cancel();
235
236        // wait for the server to shut down
237        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
238        assert!(
239            result.is_ok(),
240            "HTTP server should shut down when cancel token is cancelled"
241        );
242    }
243}
244
245// Integration tests: cargo test system_status_server --lib --features integration
246#[cfg(all(test, feature = "integration"))]
247mod integration_tests {
248    use super::*;
249    use crate::distributed::distributed_test_utils::create_test_drt_async;
250    use crate::metrics::MetricsHierarchy;
251    use anyhow::Result;
252    use rstest::rstest;
253    use std::sync::Arc;
254    use tokio::time::Duration;
255
256    #[tokio::test]
257    async fn test_uptime_from_system_health() {
258        // Test that uptime is available from SystemHealth
259        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
260            let drt = create_test_drt_async().await;
261
262            // Get uptime from SystemHealth
263            let uptime = drt.system_health.lock().uptime();
264            // Uptime should exist (even if close to zero)
265            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
266
267            // Sleep briefly and check uptime increases
268            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
269            let uptime_after = drt.system_health.lock().uptime();
270            assert!(uptime_after > uptime);
271        })
272        .await;
273    }
274
275    #[tokio::test]
276    async fn test_runtime_metrics_initialization_and_namespace() {
277        // Test that metrics have correct namespace
278        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
279            let drt = create_test_drt_async().await;
280            // SystemStatusState is already created in distributed.rs when DYN_SYSTEM_ENABLED=false
281            // so we don't need to create it again here
282
283            // The uptime_seconds metric should already be registered and available
284            let response = drt.metrics().prometheus_expfmt().unwrap();
285            println!("Full metrics response:\n{}", response);
286
287            // Filter out NATS client metrics for comparison
288            let filtered_response: String = response
289                .lines()
290                .filter(|line| {
291                    !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
292                })
293                .collect::<Vec<_>>()
294                .join("\n");
295
296            // Check that uptime_seconds metric is present with correct namespace
297            assert!(
298                filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
299                "Should contain uptime_seconds help text"
300            );
301            assert!(
302                filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
303                "Should contain uptime_seconds type"
304            );
305            assert!(
306                filtered_response.contains("dynamo_component_uptime_seconds"),
307                "Should contain uptime_seconds metric with correct namespace"
308            );
309        })
310        .await;
311    }
312
313    #[tokio::test]
314    async fn test_uptime_gauge_updates() {
315        // Test that the uptime gauge is properly updated and increases over time
316        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
317            let drt = create_test_drt_async().await;
318
319            // Get initial uptime
320            let initial_uptime = drt.system_health.lock().uptime();
321
322            // Update the gauge with initial value
323            drt.system_health.lock().update_uptime_gauge();
324
325            // Sleep for 100ms
326            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
327
328            // Get uptime after sleep
329            let uptime_after_sleep = drt.system_health.lock().uptime();
330
331            // Update the gauge again
332            drt.system_health.lock().update_uptime_gauge();
333
334            // Verify uptime increased by at least 100ms
335            let elapsed = uptime_after_sleep - initial_uptime;
336            assert!(
337                elapsed >= std::time::Duration::from_millis(100),
338                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
339                elapsed
340            );
341        })
342        .await;
343    }
344
345    #[tokio::test]
346    async fn test_http_requests_fail_when_system_disabled() {
347        // Test that system status server is not running when disabled
348        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
349            let drt = create_test_drt_async().await;
350
351            // Verify that system status server info is None when disabled
352            let system_info = drt.system_status_server_info();
353            assert!(
354                system_info.is_none(),
355                "System status server should not be running when DYN_SYSTEM_ENABLED=false"
356            );
357
358            println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
359        })
360        .await;
361    }
362
363    /// This test verifies the health and liveness endpoints of the system status server.
364    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
365    /// based on the initial health status and any custom endpoint paths provided via environment variables.
366    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
367    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
368    /// and expected response codes and bodies.
369    #[rstest]
370    #[case("ready", 200, "ready", None, None, 3)]
371    #[case("notready", 503, "notready", None, None, 3)]
372    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
373    #[case(
374        "notready",
375        503,
376        "notready",
377        Some("/custom/health"),
378        Some("/custom/live"),
379        5
380    )]
381    #[tokio::test]
382    #[cfg(feature = "integration")]
383    async fn test_health_endpoints(
384        #[case] starting_health_status: &'static str,
385        #[case] expected_status: u16,
386        #[case] expected_body: &'static str,
387        #[case] custom_health_path: Option<&'static str>,
388        #[case] custom_live_path: Option<&'static str>,
389        #[case] expected_num_tests: usize,
390    ) {
391        use std::sync::Arc;
392        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
393        // use reqwest for HTTP requests
394
395        // Closure call is needed here to satisfy async_with_vars
396
397        crate::logging::init();
398
399        #[allow(clippy::redundant_closure_call)]
400        temp_env::async_with_vars(
401            [
402                ("DYN_SYSTEM_ENABLED", Some("true")),
403                ("DYN_SYSTEM_PORT", Some("0")),
404                (
405                    "DYN_SYSTEM_STARTING_HEALTH_STATUS",
406                    Some(starting_health_status),
407                ),
408                ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
409                ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
410            ],
411            (async || {
412                let drt = Arc::new(create_test_drt_async().await);
413
414                // Get system status server info from DRT (instead of manually spawning)
415                let system_info = drt
416                    .system_status_server_info()
417                    .expect("System status server should be started by DRT");
418                let addr = system_info.socket_addr;
419
420                let client = reqwest::Client::new();
421
422                // Prepare test cases
423                let mut test_cases = vec![];
424                match custom_health_path {
425                    None => {
426                        // When using default paths, test the default paths
427                        test_cases.push(("/health", expected_status, expected_body));
428                    }
429                    Some(chp) => {
430                        // When using custom paths, default paths should not exist
431                        test_cases.push(("/health", 404, "Route not found"));
432                        test_cases.push((chp, expected_status, expected_body));
433                    }
434                }
435                match custom_live_path {
436                    None => {
437                        // When using default paths, test the default paths
438                        test_cases.push(("/live", expected_status, expected_body));
439                    }
440                    Some(clp) => {
441                        // When using custom paths, default paths should not exist
442                        test_cases.push(("/live", 404, "Route not found"));
443                        test_cases.push((clp, expected_status, expected_body));
444                    }
445                }
446                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
447                assert_eq!(test_cases.len(), expected_num_tests);
448
449                for (path, expect_status, expect_body) in test_cases {
450                    println!("[test] Sending request to {}", path);
451                    let url = format!("http://{}{}", addr, path);
452                    let response = client.get(&url).send().await.unwrap();
453                    let status = response.status();
454                    let body = response.text().await.unwrap();
455                    println!(
456                        "[test] Response for {}: status={}, body={:?}",
457                        path, status, body
458                    );
459                    assert_eq!(
460                        status, expect_status,
461                        "Response: status={}, body={:?}",
462                        status, body
463                    );
464                    assert!(
465                        body.contains(expect_body),
466                        "Response: status={}, body={:?}",
467                        status,
468                        body
469                    );
470                }
471            })(),
472        )
473        .await;
474    }
475
476    #[tokio::test]
477    async fn test_health_endpoint_tracing() -> Result<()> {
478        use std::sync::Arc;
479
480        // Closure call is needed here to satisfy async_with_vars
481
482        #[allow(clippy::redundant_closure_call)]
483        let _ = temp_env::async_with_vars(
484            [
485                ("DYN_SYSTEM_ENABLED", Some("true")),
486                ("DYN_SYSTEM_PORT", Some("0")),
487                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
488                ("DYN_LOGGING_JSONL", Some("1")),
489                ("DYN_LOG", Some("trace")),
490            ],
491            (async || {
492                // TODO Add proper testing for
493                // trace id and parent id
494
495                crate::logging::init();
496
497                let drt = Arc::new(create_test_drt_async().await);
498
499                // Get system status server info from DRT (instead of manually spawning)
500                let system_info = drt
501                    .system_status_server_info()
502                    .expect("System status server should be started by DRT");
503                let addr = system_info.socket_addr;
504                let client = reqwest::Client::new();
505                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
506                    let traceparent_value =
507                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
508                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
509                    let mut headers = reqwest::header::HeaderMap::new();
510                    headers.insert(
511                        reqwest::header::HeaderName::from_static("traceparent"),
512                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
513                    );
514                    headers.insert(
515                        reqwest::header::HeaderName::from_static("tracestate"),
516                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
517                    );
518                    let url = format!("http://{}{}", addr, path);
519                    let response = client.get(&url).headers(headers).send().await.unwrap();
520                    let status = response.status();
521                    let body = response.text().await.unwrap();
522                    tracing::info!(body = body, status = status.to_string());
523                }
524
525                Ok::<(), anyhow::Error>(())
526            })(),
527        )
528        .await;
529        Ok(())
530    }
531
532    #[tokio::test]
533    async fn test_health_endpoint_with_changing_health_status() {
534        // Test health endpoint starts in not ready status, then becomes ready
535        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
536        const ENDPOINT_NAME: &str = "generate";
537        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
538        temp_env::async_with_vars(
539            [
540                ("DYN_SYSTEM_ENABLED", Some("true")),
541                ("DYN_SYSTEM_PORT", Some("0")),
542                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
543                ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
544            ],
545            async {
546                let drt = Arc::new(create_test_drt_async().await);
547
548                // Check if system status server was started
549                let system_info_opt = drt.system_status_server_info();
550
551                // Ensure system status server was spawned by DRT
552                assert!(
553                    system_info_opt.is_some(),
554                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
555                    std::env::var("DYN_SYSTEM_ENABLED"),
556                    std::env::var("DYN_SYSTEM_PORT")
557                );
558
559                // Get the system status server info from DRT - this should never fail now due to above check
560                let system_info = system_info_opt.unwrap();
561                let addr = system_info.socket_addr;
562
563                // Initially check health - should be not ready
564                let client = reqwest::Client::new();
565                let health_url = format!("http://{}/health", addr);
566
567                let response = client.get(&health_url).send().await.unwrap();
568                let status = response.status();
569                let body = response.text().await.unwrap();
570
571                // Health should be not ready (503) initially
572                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
573                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
574
575                // Now create a namespace, component, and endpoint to make the system healthy
576                let namespace = drt.namespace("ns1234").unwrap();
577                let mut component = namespace.component("comp1234").unwrap();
578
579                // Create a simple test handler
580                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
581                use crate::protocols::annotated::Annotated;
582
583                struct TestHandler;
584
585                #[async_trait]
586                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
587                    async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
588                        let (data, ctx) = input.into_parts();
589                        let response = Annotated::from_data(format!("You responded: {}", data));
590                        Ok(crate::pipeline::ResponseStream::new(
591                            Box::pin(crate::stream::iter(vec![response])),
592                            ctx.context()
593                        ))
594                    }
595                }
596
597                // Create the ingress and start the endpoint service
598                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
599
600                // Start the service and endpoint with a health check payload
601                // This will automatically register the endpoint for health monitoring
602                tokio::spawn(async move {
603                    component.add_stats_service().await.unwrap();
604                    let _ = component.endpoint(ENDPOINT_NAME)
605                        .endpoint_builder()
606                        .handler(ingress)
607                        .health_check_payload(serde_json::json!({
608                            "test": "health_check"
609                        }))
610                        .start()
611                        .await;
612                });
613
614                // Hit health endpoint 200 times to verify consistency
615                let mut success_count = 0;
616                let mut failures = Vec::new();
617
618                for i in 1..=200 {
619                    let response = client.get(&health_url).send().await.unwrap();
620                    let status = response.status();
621                    let body = response.text().await.unwrap();
622
623                    if status == 200 && body.contains("\"status\":\"ready\"") {
624                        success_count += 1;
625                    } else {
626                        failures.push((i, status.as_u16(), body.clone()));
627                        if failures.len() <= 5 {  // Only log first 5 failures
628                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
629                        }
630                    }
631                }
632
633                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
634                if !failures.is_empty() {
635                    tracing::warn!("Failed requests: {}", failures.len());
636                }
637
638                // Expect at least 150 out of 200 requests to be successful
639                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
640            },
641        )
642        .await;
643    }
644
645    #[tokio::test]
646    async fn test_spawn_system_status_server_endpoints() {
647        // use reqwest for HTTP requests
648        temp_env::async_with_vars(
649            [
650                ("DYN_SYSTEM_ENABLED", Some("true")),
651                ("DYN_SYSTEM_PORT", Some("0")),
652                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
653            ],
654            async {
655                let drt = Arc::new(create_test_drt_async().await);
656
657                // Get system status server info from DRT (instead of manually spawning)
658                let system_info = drt
659                    .system_status_server_info()
660                    .expect("System status server should be started by DRT");
661                let addr = system_info.socket_addr;
662                let client = reqwest::Client::new();
663                for (path, expect_200, expect_body) in [
664                    ("/health", true, "ready"),
665                    ("/live", true, "ready"),
666                    ("/someRandomPathNotFoundHere", false, "Route not found"),
667                ] {
668                    println!("[test] Sending request to {}", path);
669                    let url = format!("http://{}{}", addr, path);
670                    let response = client.get(&url).send().await.unwrap();
671                    let status = response.status();
672                    let body = response.text().await.unwrap();
673                    println!(
674                        "[test] Response for {}: status={}, body={:?}",
675                        path, status, body
676                    );
677                    if expect_200 {
678                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
679                    } else {
680                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
681                    }
682                    assert!(
683                        body.contains(expect_body),
684                        "Response: status={}, body={:?}",
685                        status,
686                        body
687                    );
688                }
689                // DRT handles server cleanup automatically
690            },
691        )
692        .await;
693    }
694
695    #[cfg(feature = "integration")]
696    #[tokio::test]
697    async fn test_health_check_with_payload_and_timeout() {
698        // Test the complete health check flow with the new canary-based system:
699        crate::logging::init();
700
701        temp_env::async_with_vars(
702            [
703                ("DYN_SYSTEM_ENABLED", Some("true")),
704                ("DYN_SYSTEM_PORT", Some("0")),
705                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
706                (
707                    "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
708                    Some("[\"test.endpoint\"]"),
709                ),
710                // Enable health check with short intervals for testing
711                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
712                ("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
713                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
714                ("RUST_LOG", Some("info")),                      // Enable logging for test
715            ],
716            async {
717                let drt = Arc::new(create_test_drt_async().await);
718
719                // Get system status server info
720                let system_info = drt
721                    .system_status_server_info()
722                    .expect("System status server should be started");
723                let addr = system_info.socket_addr;
724
725                let client = reqwest::Client::new();
726                let health_url = format!("http://{}/health", addr);
727
728                // Register an endpoint with health check payload
729                let endpoint = "test.endpoint";
730                let health_check_payload = serde_json::json!({
731                    "prompt": "health check test",
732                    "_health_check": true
733                });
734
735                // Register the endpoint and its health check payload
736                {
737                    let system_health = drt.system_health.lock();
738                    system_health.register_health_check_target(
739                        endpoint,
740                        crate::component::Instance {
741                            component: "test_component".to_string(),
742                            endpoint: "health".to_string(),
743                            namespace: "test_namespace".to_string(),
744                            instance_id: 1,
745                            transport: crate::component::TransportType::NatsTcp(
746                                endpoint.to_string(),
747                            ),
748                        },
749                        health_check_payload.clone(),
750                    );
751                }
752
753                // Check initial health - should be ready (default state)
754                let response = client.get(&health_url).send().await.unwrap();
755                let status = response.status();
756                let body = response.text().await.unwrap();
757                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
758                assert!(
759                    body.contains("\"status\":\"notready\""),
760                    "Should show notready status initially"
761                );
762
763                // Set endpoint to healthy state
764                drt.system_health
765                    .lock()
766                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
767
768                // Check health again - should now be healthy
769                let response = client.get(&health_url).send().await.unwrap();
770                let status = response.status();
771                let body = response.text().await.unwrap();
772
773                assert_eq!(status, 200, "Should be healthy due to recent response");
774                assert!(
775                    body.contains("\"status\":\"ready\""),
776                    "Should show ready status after response"
777                );
778
779                // Verify the endpoint status in SystemHealth directly
780                let endpoint_status = drt
781                    .system_health
782                    .lock()
783                    .get_endpoint_health_status(endpoint);
784                assert_eq!(
785                    endpoint_status,
786                    Some(HealthStatus::Ready),
787                    "SystemHealth should show endpoint as Ready after response"
788                );
789            },
790        )
791        .await;
792    }
793}