dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsRegistry;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18/// System status server information containing socket address and handle
19#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21    pub socket_addr: std::net::SocketAddr,
22    pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27        Self {
28            socket_addr,
29            handle: handle.map(Arc::new),
30        }
31    }
32
33    pub fn address(&self) -> String {
34        self.socket_addr.to_string()
35    }
36
37    pub fn hostname(&self) -> String {
38        self.socket_addr.ip().to_string()
39    }
40
41    pub fn port(&self) -> u16 {
42        self.socket_addr.port()
43    }
44}
45
46impl Clone for SystemStatusServerInfo {
47    fn clone(&self) -> Self {
48        Self {
49            socket_addr: self.socket_addr,
50            handle: self.handle.clone(),
51        }
52    }
53}
54
55/// System status server state containing the distributed runtime reference
56pub struct SystemStatusState {
57    // global drt registry is for printing out the entire Prometheus format output
58    root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62    /// Create new system status server state with the provided distributed runtime
63    pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64        Ok(Self { root_drt: drt })
65    }
66
67    /// Get a reference to the distributed runtime
68    pub fn drt(&self) -> &crate::DistributedRuntime {
69        &self.root_drt
70    }
71}
72
73/// Start system status server with metrics support
74pub async fn spawn_system_status_server(
75    host: &str,
76    port: u16,
77    cancel_token: CancellationToken,
78    drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80    // Create system status server state with the provided distributed runtime
81    let server_state = Arc::new(SystemStatusState::new(drt)?);
82    let health_path = server_state
83        .drt()
84        .system_health
85        .lock()
86        .unwrap()
87        .health_path()
88        .to_string();
89    let live_path = server_state
90        .drt()
91        .system_health
92        .lock()
93        .unwrap()
94        .live_path()
95        .to_string();
96
97    let app = Router::new()
98        .route(
99            &health_path,
100            get({
101                let state = Arc::clone(&server_state);
102                move || health_handler(state)
103            }),
104        )
105        .route(
106            &live_path,
107            get({
108                let state = Arc::clone(&server_state);
109                move || health_handler(state)
110            }),
111        )
112        .route(
113            "/metrics",
114            get({
115                let state = Arc::clone(&server_state);
116                move || metrics_handler(state)
117            }),
118        )
119        .fallback(|| async {
120            tracing::info!("[fallback handler] called");
121            (StatusCode::NOT_FOUND, "Route not found").into_response()
122        })
123        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
124
125    let address = format!("{}:{}", host, port);
126    tracing::info!("[spawn_system_status_server] binding to: {}", address);
127
128    let listener = match TcpListener::bind(&address).await {
129        Ok(listener) => {
130            // get the actual address and port, print in debug level
131            let actual_address = listener.local_addr()?;
132            tracing::info!(
133                "[spawn_system_status_server] system status server bound to: {}",
134                actual_address
135            );
136            (listener, actual_address)
137        }
138        Err(e) => {
139            tracing::error!("Failed to bind to address {}: {}", address, e);
140            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
141        }
142    };
143    let (listener, actual_address) = listener;
144
145    let observer = cancel_token.child_token();
146    // Spawn the server in the background and return the handle
147    let handle = tokio::spawn(async move {
148        if let Err(e) = axum::serve(listener, app)
149            .with_graceful_shutdown(observer.cancelled_owned())
150            .await
151        {
152            tracing::error!("System status server error: {}", e);
153        }
154    });
155
156    Ok((actual_address, handle))
157}
158
159/// Health handler with optional active health checking
160#[tracing::instrument(skip_all, level = "trace")]
161async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
162    // Get basic health status
163    let system_health = state.drt().system_health.lock().unwrap();
164    let (healthy, endpoints) = system_health.get_health_status();
165    let uptime = Some(system_health.uptime());
166
167    let healthy_string = if healthy { "ready" } else { "notready" };
168    let status_code = if healthy {
169        StatusCode::OK
170    } else {
171        StatusCode::SERVICE_UNAVAILABLE
172    };
173
174    let response = json!({
175        "status": healthy_string,
176        "uptime": uptime,
177        "endpoints": endpoints,
178    });
179
180    tracing::trace!("Response {}", response.to_string());
181
182    (status_code, response.to_string())
183}
184
185/// Metrics handler with DistributedRuntime uptime
186#[tracing::instrument(skip_all, level = "trace")]
187async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
188    // Update the uptime gauge with current value
189    state
190        .drt()
191        .system_health
192        .lock()
193        .unwrap()
194        .update_uptime_gauge();
195
196    // Execute all the callbacks for all registered hierarchies
197    let all_hierarchies: Vec<String> = {
198        let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
199        registries.keys().cloned().collect()
200    };
201
202    for hierarchy in &all_hierarchies {
203        let callback_results = state.drt().execute_prometheus_update_callbacks(hierarchy);
204        for result in callback_results {
205            if let Err(e) = result {
206                tracing::error!(
207                    "Error executing metrics callback for hierarchy '{}': {}",
208                    hierarchy,
209                    e
210                );
211            }
212        }
213    }
214
215    // Get all metrics from DistributedRuntime (top-level)
216    let mut response = match state.drt().prometheus_expfmt() {
217        Ok(r) => r,
218        Err(e) => {
219            tracing::error!("Failed to get metrics from registry: {}", e);
220            return (
221                StatusCode::INTERNAL_SERVER_ERROR,
222                "Failed to get metrics".to_string(),
223            );
224        }
225    };
226
227    // Collect and append Prometheus exposition text from all hierarchies
228    for hierarchy in &all_hierarchies {
229        let expfmt = {
230            let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
231            if let Some(entry) = registries.get(hierarchy) {
232                entry.execute_prometheus_expfmt_callbacks()
233            } else {
234                String::new()
235            }
236        };
237
238        if !expfmt.is_empty() {
239            if !response.ends_with('\n') {
240                response.push('\n');
241            }
242            response.push_str(&expfmt);
243        }
244    }
245
246    (StatusCode::OK, response)
247}
248
249// Regular tests: cargo test system_status_server --lib
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use tokio::time::Duration;
254
255    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
256    #[tokio::test]
257    async fn test_http_server_lifecycle() {
258        let cancel_token = CancellationToken::new();
259        let cancel_token_for_server = cancel_token.clone();
260
261        // Test basic HTTP server lifecycle without DistributedRuntime
262        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
263
264        // start HTTP server
265        let server_handle = tokio::spawn(async move {
266            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
267            let _ = axum::serve(listener, app)
268                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
269                .await;
270        });
271
272        // server starts immediately, no need to wait
273
274        // cancel token
275        cancel_token.cancel();
276
277        // wait for the server to shut down
278        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
279        assert!(
280            result.is_ok(),
281            "HTTP server should shut down when cancel token is cancelled"
282        );
283    }
284}
285
286// Integration tests: cargo test system_status_server --lib --features integration
287#[cfg(all(test, feature = "integration"))]
288mod integration_tests {
289    use super::*;
290    use crate::distributed::distributed_test_utils::create_test_drt_async;
291    use crate::metrics::MetricsRegistry;
292    use anyhow::Result;
293    use rstest::rstest;
294    use std::sync::Arc;
295    use tokio::time::Duration;
296
297    #[tokio::test]
298    async fn test_uptime_from_system_health() {
299        // Test that uptime is available from SystemHealth
300        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
301            let drt = create_test_drt_async().await;
302
303            // Get uptime from SystemHealth
304            let uptime = drt.system_health.lock().unwrap().uptime();
305            // Uptime should exist (even if close to zero)
306            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
307
308            // Sleep briefly and check uptime increases
309            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
310            let uptime_after = drt.system_health.lock().unwrap().uptime();
311            assert!(uptime_after > uptime);
312        })
313        .await;
314    }
315
316    #[tokio::test]
317    async fn test_runtime_metrics_initialization_and_namespace() {
318        // Test that metrics have correct namespace
319        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
320            let drt = create_test_drt_async().await;
321            // SystemStatusState is already created in distributed.rs when DYN_SYSTEM_ENABLED=false
322            // so we don't need to create it again here
323
324            // The uptime_seconds metric should already be registered and available
325            let response = drt.prometheus_expfmt().unwrap();
326            println!("Full metrics response:\n{}", response);
327
328            // Filter out NATS client metrics for comparison
329            let filtered_response: String = response
330                .lines()
331                .filter(|line| {
332                    !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
333                })
334                .collect::<Vec<_>>()
335                .join("\n");
336
337            // Check that uptime_seconds metric is present with correct namespace
338            assert!(
339                filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
340                "Should contain uptime_seconds help text"
341            );
342            assert!(
343                filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
344                "Should contain uptime_seconds type"
345            );
346            assert!(
347                filtered_response.contains("dynamo_component_uptime_seconds"),
348                "Should contain uptime_seconds metric with correct namespace"
349            );
350        })
351        .await;
352    }
353
354    #[tokio::test]
355    async fn test_uptime_gauge_updates() {
356        // Test that the uptime gauge is properly updated and increases over time
357        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
358            let drt = create_test_drt_async().await;
359
360            // Get initial uptime
361            let initial_uptime = drt.system_health.lock().unwrap().uptime();
362
363            // Update the gauge with initial value
364            drt.system_health.lock().unwrap().update_uptime_gauge();
365
366            // Sleep for 100ms
367            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
368
369            // Get uptime after sleep
370            let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
371
372            // Update the gauge again
373            drt.system_health.lock().unwrap().update_uptime_gauge();
374
375            // Verify uptime increased by at least 100ms
376            let elapsed = uptime_after_sleep - initial_uptime;
377            assert!(
378                elapsed >= std::time::Duration::from_millis(100),
379                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
380                elapsed
381            );
382        })
383        .await;
384    }
385
386    #[tokio::test]
387    async fn test_http_requests_fail_when_system_disabled() {
388        // Test that system status server is not running when disabled
389        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
390            let drt = create_test_drt_async().await;
391
392            // Verify that system status server info is None when disabled
393            let system_info = drt.system_status_server_info();
394            assert!(
395                system_info.is_none(),
396                "System status server should not be running when DYN_SYSTEM_ENABLED=false"
397            );
398
399            println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
400        })
401        .await;
402    }
403
404    /// This test verifies the health and liveness endpoints of the system status server.
405    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
406    /// based on the initial health status and any custom endpoint paths provided via environment variables.
407    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
408    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
409    /// and expected response codes and bodies.
410    #[rstest]
411    #[case("ready", 200, "ready", None, None, 3)]
412    #[case("notready", 503, "notready", None, None, 3)]
413    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
414    #[case(
415        "notready",
416        503,
417        "notready",
418        Some("/custom/health"),
419        Some("/custom/live"),
420        5
421    )]
422    #[tokio::test]
423    #[cfg(feature = "integration")]
424    async fn test_health_endpoints(
425        #[case] starting_health_status: &'static str,
426        #[case] expected_status: u16,
427        #[case] expected_body: &'static str,
428        #[case] custom_health_path: Option<&'static str>,
429        #[case] custom_live_path: Option<&'static str>,
430        #[case] expected_num_tests: usize,
431    ) {
432        use std::sync::Arc;
433        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
434        // use reqwest for HTTP requests
435
436        // Closure call is needed here to satisfy async_with_vars
437
438        crate::logging::init();
439
440        #[allow(clippy::redundant_closure_call)]
441        temp_env::async_with_vars(
442            [
443                ("DYN_SYSTEM_ENABLED", Some("true")),
444                ("DYN_SYSTEM_PORT", Some("0")),
445                (
446                    "DYN_SYSTEM_STARTING_HEALTH_STATUS",
447                    Some(starting_health_status),
448                ),
449                ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
450                ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
451            ],
452            (async || {
453                let drt = Arc::new(create_test_drt_async().await);
454
455                // Get system status server info from DRT (instead of manually spawning)
456                let system_info = drt
457                    .system_status_server_info()
458                    .expect("System status server should be started by DRT");
459                let addr = system_info.socket_addr;
460
461                let client = reqwest::Client::new();
462
463                // Prepare test cases
464                let mut test_cases = vec![];
465                match custom_health_path {
466                    None => {
467                        // When using default paths, test the default paths
468                        test_cases.push(("/health", expected_status, expected_body));
469                    }
470                    Some(chp) => {
471                        // When using custom paths, default paths should not exist
472                        test_cases.push(("/health", 404, "Route not found"));
473                        test_cases.push((chp, expected_status, expected_body));
474                    }
475                }
476                match custom_live_path {
477                    None => {
478                        // When using default paths, test the default paths
479                        test_cases.push(("/live", expected_status, expected_body));
480                    }
481                    Some(clp) => {
482                        // When using custom paths, default paths should not exist
483                        test_cases.push(("/live", 404, "Route not found"));
484                        test_cases.push((clp, expected_status, expected_body));
485                    }
486                }
487                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
488                assert_eq!(test_cases.len(), expected_num_tests);
489
490                for (path, expect_status, expect_body) in test_cases {
491                    println!("[test] Sending request to {}", path);
492                    let url = format!("http://{}{}", addr, path);
493                    let response = client.get(&url).send().await.unwrap();
494                    let status = response.status();
495                    let body = response.text().await.unwrap();
496                    println!(
497                        "[test] Response for {}: status={}, body={:?}",
498                        path, status, body
499                    );
500                    assert_eq!(
501                        status, expect_status,
502                        "Response: status={}, body={:?}",
503                        status, body
504                    );
505                    assert!(
506                        body.contains(expect_body),
507                        "Response: status={}, body={:?}",
508                        status,
509                        body
510                    );
511                }
512            })(),
513        )
514        .await;
515    }
516
517    #[tokio::test]
518    async fn test_health_endpoint_tracing() -> Result<()> {
519        use std::sync::Arc;
520
521        // Closure call is needed here to satisfy async_with_vars
522
523        #[allow(clippy::redundant_closure_call)]
524        let _ = temp_env::async_with_vars(
525            [
526                ("DYN_SYSTEM_ENABLED", Some("true")),
527                ("DYN_SYSTEM_PORT", Some("0")),
528                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
529                ("DYN_LOGGING_JSONL", Some("1")),
530                ("DYN_LOG", Some("trace")),
531            ],
532            (async || {
533                // TODO Add proper testing for
534                // trace id and parent id
535
536                crate::logging::init();
537
538                let drt = Arc::new(create_test_drt_async().await);
539
540                // Get system status server info from DRT (instead of manually spawning)
541                let system_info = drt
542                    .system_status_server_info()
543                    .expect("System status server should be started by DRT");
544                let addr = system_info.socket_addr;
545                let client = reqwest::Client::new();
546                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
547                    let traceparent_value =
548                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
549                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
550                    let mut headers = reqwest::header::HeaderMap::new();
551                    headers.insert(
552                        reqwest::header::HeaderName::from_static("traceparent"),
553                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
554                    );
555                    headers.insert(
556                        reqwest::header::HeaderName::from_static("tracestate"),
557                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
558                    );
559                    let url = format!("http://{}{}", addr, path);
560                    let response = client.get(&url).headers(headers).send().await.unwrap();
561                    let status = response.status();
562                    let body = response.text().await.unwrap();
563                    tracing::info!(body = body, status = status.to_string());
564                }
565
566                Ok::<(), anyhow::Error>(())
567            })(),
568        )
569        .await;
570        Ok(())
571    }
572
573    #[tokio::test]
574    async fn test_health_endpoint_with_changing_health_status() {
575        // Test health endpoint starts in not ready status, then becomes ready
576        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
577        const ENDPOINT_NAME: &str = "generate";
578        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
579        temp_env::async_with_vars(
580            [
581                ("DYN_SYSTEM_ENABLED", Some("true")),
582                ("DYN_SYSTEM_PORT", Some("0")),
583                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
584                ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
585            ],
586            async {
587                let drt = Arc::new(create_test_drt_async().await);
588
589                // Check if system status server was started
590                let system_info_opt = drt.system_status_server_info();
591
592                // Ensure system status server was spawned by DRT
593                assert!(
594                    system_info_opt.is_some(),
595                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
596                    std::env::var("DYN_SYSTEM_ENABLED"),
597                    std::env::var("DYN_SYSTEM_PORT")
598                );
599
600                // Get the system status server info from DRT - this should never fail now due to above check
601                let system_info = system_info_opt.unwrap();
602                let addr = system_info.socket_addr;
603
604                // Initially check health - should be not ready
605                let client = reqwest::Client::new();
606                let health_url = format!("http://{}/health", addr);
607
608                let response = client.get(&health_url).send().await.unwrap();
609                let status = response.status();
610                let body = response.text().await.unwrap();
611
612                // Health should be not ready (503) initially
613                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
614                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
615
616                // Now create a namespace, component, and endpoint to make the system healthy
617                let namespace = drt.namespace("ns1234").unwrap();
618                let component = namespace.component("comp1234").unwrap();
619
620                // Create a simple test handler
621                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
622                use crate::protocols::annotated::Annotated;
623
624                struct TestHandler;
625
626                #[async_trait]
627                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
628                    async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
629                        let (data, ctx) = input.into_parts();
630                        let response = Annotated::from_data(format!("You responded: {}", data));
631                        Ok(crate::pipeline::ResponseStream::new(
632                            Box::pin(crate::stream::iter(vec![response])),
633                            ctx.context()
634                        ))
635                    }
636                }
637
638                // Create the ingress and start the endpoint service
639                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
640
641                // Start the service and endpoint with a health check payload
642                // This will automatically register the endpoint for health monitoring
643                tokio::spawn(async move {
644                    let _ = component
645                        .service_builder()
646                        .create()
647                        .await
648                        .unwrap()
649                        .endpoint(ENDPOINT_NAME)
650                        .endpoint_builder()
651                        .handler(ingress)
652                        .health_check_payload(serde_json::json!({
653                            "test": "health_check"
654                        }))
655                        .start()
656                        .await;
657                });
658
659                // Hit health endpoint 200 times to verify consistency
660                let mut success_count = 0;
661                let mut failures = Vec::new();
662
663                for i in 1..=200 {
664                    let response = client.get(&health_url).send().await.unwrap();
665                    let status = response.status();
666                    let body = response.text().await.unwrap();
667
668                    if status == 200 && body.contains("\"status\":\"ready\"") {
669                        success_count += 1;
670                    } else {
671                        failures.push((i, status.as_u16(), body.clone()));
672                        if failures.len() <= 5 {  // Only log first 5 failures
673                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
674                        }
675                    }
676                }
677
678                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
679                if !failures.is_empty() {
680                    tracing::warn!("Failed requests: {}", failures.len());
681                }
682
683                // Expect at least 150 out of 200 requests to be successful
684                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
685            },
686        )
687        .await;
688    }
689
690    #[tokio::test]
691    async fn test_spawn_system_status_server_endpoints() {
692        // use reqwest for HTTP requests
693        temp_env::async_with_vars(
694            [
695                ("DYN_SYSTEM_ENABLED", Some("true")),
696                ("DYN_SYSTEM_PORT", Some("0")),
697                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
698            ],
699            async {
700                let drt = Arc::new(create_test_drt_async().await);
701
702                // Get system status server info from DRT (instead of manually spawning)
703                let system_info = drt
704                    .system_status_server_info()
705                    .expect("System status server should be started by DRT");
706                let addr = system_info.socket_addr;
707                let client = reqwest::Client::new();
708                for (path, expect_200, expect_body) in [
709                    ("/health", true, "ready"),
710                    ("/live", true, "ready"),
711                    ("/someRandomPathNotFoundHere", false, "Route not found"),
712                ] {
713                    println!("[test] Sending request to {}", path);
714                    let url = format!("http://{}{}", addr, path);
715                    let response = client.get(&url).send().await.unwrap();
716                    let status = response.status();
717                    let body = response.text().await.unwrap();
718                    println!(
719                        "[test] Response for {}: status={}, body={:?}",
720                        path, status, body
721                    );
722                    if expect_200 {
723                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
724                    } else {
725                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
726                    }
727                    assert!(
728                        body.contains(expect_body),
729                        "Response: status={}, body={:?}",
730                        status,
731                        body
732                    );
733                }
734                // DRT handles server cleanup automatically
735            },
736        )
737        .await;
738    }
739
740    #[cfg(feature = "integration")]
741    #[tokio::test]
742    async fn test_health_check_with_payload_and_timeout() {
743        // Test the complete health check flow with the new canary-based system:
744        crate::logging::init();
745
746        temp_env::async_with_vars(
747            [
748                ("DYN_SYSTEM_ENABLED", Some("true")),
749                ("DYN_SYSTEM_PORT", Some("0")),
750                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
751                (
752                    "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
753                    Some("[\"test.endpoint\"]"),
754                ),
755                // Enable health check with short intervals for testing
756                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
757                ("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
758                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
759                ("RUST_LOG", Some("info")),                      // Enable logging for test
760            ],
761            async {
762                let drt = Arc::new(create_test_drt_async().await);
763
764                // Get system status server info
765                let system_info = drt
766                    .system_status_server_info()
767                    .expect("System status server should be started");
768                let addr = system_info.socket_addr;
769
770                let client = reqwest::Client::new();
771                let health_url = format!("http://{}/health", addr);
772
773                // Register an endpoint with health check payload
774                let endpoint = "test.endpoint";
775                let health_check_payload = serde_json::json!({
776                    "prompt": "health check test",
777                    "_health_check": true
778                });
779
780                // Register the endpoint and its health check payload
781                {
782                    let system_health = drt.system_health.lock().unwrap();
783                    system_health.register_health_check_target(
784                        endpoint,
785                        crate::component::Instance {
786                            component: "test_component".to_string(),
787                            endpoint: "health".to_string(),
788                            namespace: "test_namespace".to_string(),
789                            instance_id: 1,
790                            transport: crate::component::TransportType::NatsTcp(
791                                endpoint.to_string(),
792                            ),
793                        },
794                        health_check_payload.clone(),
795                    );
796                }
797
798                // Check initial health - should be ready (default state)
799                let response = client.get(&health_url).send().await.unwrap();
800                let status = response.status();
801                let body = response.text().await.unwrap();
802                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
803                assert!(
804                    body.contains("\"status\":\"notready\""),
805                    "Should show notready status initially"
806                );
807
808                // Set endpoint to healthy state
809                drt.system_health
810                    .lock()
811                    .unwrap()
812                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
813
814                // Check health again - should now be healthy
815                let response = client.get(&health_url).send().await.unwrap();
816                let status = response.status();
817                let body = response.text().await.unwrap();
818
819                assert_eq!(status, 200, "Should be healthy due to recent response");
820                assert!(
821                    body.contains("\"status\":\"ready\""),
822                    "Should show ready status after response"
823                );
824
825                // Verify the endpoint status in SystemHealth directly
826                let endpoint_status = drt
827                    .system_health
828                    .lock()
829                    .unwrap()
830                    .get_endpoint_health_status(endpoint);
831                assert_eq!(
832                    endpoint_status,
833                    Some(HealthStatus::Ready),
834                    "SystemHealth should show endpoint as Ready after response"
835                );
836            },
837        )
838        .await;
839    }
840}