dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsRegistry;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18/// System status server information containing socket address and handle
19#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21    pub socket_addr: std::net::SocketAddr,
22    pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27        Self {
28            socket_addr,
29            handle: handle.map(Arc::new),
30        }
31    }
32
33    pub fn address(&self) -> String {
34        self.socket_addr.to_string()
35    }
36
37    pub fn hostname(&self) -> String {
38        self.socket_addr.ip().to_string()
39    }
40
41    pub fn port(&self) -> u16 {
42        self.socket_addr.port()
43    }
44}
45
46impl Clone for SystemStatusServerInfo {
47    fn clone(&self) -> Self {
48        Self {
49            socket_addr: self.socket_addr,
50            handle: self.handle.clone(),
51        }
52    }
53}
54
55/// System status server state containing the distributed runtime reference
56pub struct SystemStatusState {
57    // global drt registry is for printing out the entire Prometheus format output
58    root_drt: Arc<crate::DistributedRuntime>,
59}
60
61impl SystemStatusState {
62    /// Create new system status server state with the provided distributed runtime
63    pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
64        Ok(Self { root_drt: drt })
65    }
66
67    /// Get a reference to the distributed runtime
68    pub fn drt(&self) -> &crate::DistributedRuntime {
69        &self.root_drt
70    }
71}
72
73/// Start system status server with metrics support
74pub async fn spawn_system_status_server(
75    host: &str,
76    port: u16,
77    cancel_token: CancellationToken,
78    drt: Arc<crate::DistributedRuntime>,
79) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
80    // Create system status server state with the provided distributed runtime
81    let server_state = Arc::new(SystemStatusState::new(drt)?);
82    let health_path = server_state
83        .drt()
84        .system_health
85        .lock()
86        .unwrap()
87        .health_path()
88        .to_string();
89    let live_path = server_state
90        .drt()
91        .system_health
92        .lock()
93        .unwrap()
94        .live_path()
95        .to_string();
96
97    let app = Router::new()
98        .route(
99            &health_path,
100            get({
101                let state = Arc::clone(&server_state);
102                move || health_handler(state)
103            }),
104        )
105        .route(
106            &live_path,
107            get({
108                let state = Arc::clone(&server_state);
109                move || health_handler(state)
110            }),
111        )
112        .route(
113            "/metrics",
114            get({
115                let state = Arc::clone(&server_state);
116                move || metrics_handler(state)
117            }),
118        )
119        .fallback(|| async {
120            tracing::info!("[fallback handler] called");
121            (StatusCode::NOT_FOUND, "Route not found").into_response()
122        })
123        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
124
125    let address = format!("{}:{}", host, port);
126    tracing::info!("[spawn_system_status_server] binding to: {}", address);
127
128    let listener = match TcpListener::bind(&address).await {
129        Ok(listener) => {
130            // get the actual address and port, print in debug level
131            let actual_address = listener.local_addr()?;
132            tracing::info!(
133                "[spawn_system_status_server] system status server bound to: {}",
134                actual_address
135            );
136            (listener, actual_address)
137        }
138        Err(e) => {
139            tracing::error!("Failed to bind to address {}: {}", address, e);
140            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
141        }
142    };
143    let (listener, actual_address) = listener;
144
145    let observer = cancel_token.child_token();
146    // Spawn the server in the background and return the handle
147    let handle = tokio::spawn(async move {
148        if let Err(e) = axum::serve(listener, app)
149            .with_graceful_shutdown(observer.cancelled_owned())
150            .await
151        {
152            tracing::error!("System status server error: {}", e);
153        }
154    });
155
156    Ok((actual_address, handle))
157}
158
159/// Health handler with optional active health checking
160#[tracing::instrument(skip_all, level = "trace")]
161async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
162    // Get basic health status
163    let system_health = state.drt().system_health.lock().unwrap();
164    let (healthy, endpoints) = system_health.get_health_status();
165    let uptime = Some(system_health.uptime());
166
167    let healthy_string = if healthy { "ready" } else { "notready" };
168    let status_code = if healthy {
169        StatusCode::OK
170    } else {
171        StatusCode::SERVICE_UNAVAILABLE
172    };
173
174    let response = json!({
175        "status": healthy_string,
176        "uptime": uptime,
177        "endpoints": endpoints,
178    });
179
180    tracing::trace!("Response {}", response.to_string());
181
182    (status_code, response.to_string())
183}
184
185/// Metrics handler with DistributedRuntime uptime
186#[tracing::instrument(skip_all, level = "trace")]
187async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
188    // Update the uptime gauge with current value
189    state
190        .drt()
191        .system_health
192        .lock()
193        .unwrap()
194        .update_uptime_gauge();
195
196    // Execute all the callbacks starting at the DistributedRuntime level
197    assert!(state.drt().basename() == "");
198    let callback_results = state
199        .drt()
200        .execute_metrics_callbacks(&state.drt().hierarchy());
201    for result in callback_results {
202        if let Err(e) = result {
203            tracing::error!("Error executing metrics callback: {}", e);
204        }
205    }
206
207    // Get all metrics from DistributedRuntime (top-level)
208    match state.drt().prometheus_metrics_fmt() {
209        Ok(response) => (StatusCode::OK, response),
210        Err(e) => {
211            tracing::error!("Failed to get metrics from registry: {}", e);
212            (
213                StatusCode::INTERNAL_SERVER_ERROR,
214                "Failed to get metrics".to_string(),
215            )
216        }
217    }
218}
219
220// Regular tests: cargo test system_status_server --lib
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use tokio::time::Duration;
225
226    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
227    #[tokio::test]
228    async fn test_http_server_lifecycle() {
229        let cancel_token = CancellationToken::new();
230        let cancel_token_for_server = cancel_token.clone();
231
232        // Test basic HTTP server lifecycle without DistributedRuntime
233        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
234
235        // start HTTP server
236        let server_handle = tokio::spawn(async move {
237            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
238            let _ = axum::serve(listener, app)
239                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
240                .await;
241        });
242
243        // server starts immediately, no need to wait
244
245        // cancel token
246        cancel_token.cancel();
247
248        // wait for the server to shut down
249        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
250        assert!(
251            result.is_ok(),
252            "HTTP server should shut down when cancel token is cancelled"
253        );
254    }
255}
256
257// Integration tests: cargo test system_status_server --lib --features integration
258#[cfg(all(test, feature = "integration"))]
259mod integration_tests {
260    use super::*;
261    use crate::distributed::distributed_test_utils::create_test_drt_async;
262    use crate::metrics::MetricsRegistry;
263    use anyhow::Result;
264    use rstest::rstest;
265    use std::sync::Arc;
266    use tokio::time::Duration;
267
268    #[tokio::test]
269    async fn test_uptime_from_system_health() {
270        // Test that uptime is available from SystemHealth
271        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
272            let drt = create_test_drt_async().await;
273
274            // Get uptime from SystemHealth
275            let uptime = drt.system_health.lock().unwrap().uptime();
276            // Uptime should exist (even if close to zero)
277            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
278
279            // Sleep briefly and check uptime increases
280            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
281            let uptime_after = drt.system_health.lock().unwrap().uptime();
282            assert!(uptime_after > uptime);
283        })
284        .await;
285    }
286
287    #[tokio::test]
288    async fn test_runtime_metrics_initialization_and_namespace() {
289        // Test that metrics have correct namespace
290        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
291            let drt = create_test_drt_async().await;
292            // SystemStatusState is already created in distributed.rs when DYN_SYSTEM_ENABLED=false
293            // so we don't need to create it again here
294
295            // The uptime_seconds metric should already be registered and available
296            let response = drt.prometheus_metrics_fmt().unwrap();
297            println!("Full metrics response:\n{}", response);
298
299            // Filter out NATS client metrics for comparison
300            let filtered_response: String = response
301                .lines()
302                .filter(|line| {
303                    !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
304                })
305                .collect::<Vec<_>>()
306                .join("\n");
307
308            // Check that uptime_seconds metric is present with correct namespace
309            assert!(
310                filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
311                "Should contain uptime_seconds help text"
312            );
313            assert!(
314                filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
315                "Should contain uptime_seconds type"
316            );
317            assert!(
318                filtered_response.contains("dynamo_component_uptime_seconds"),
319                "Should contain uptime_seconds metric with correct namespace"
320            );
321        })
322        .await;
323    }
324
325    #[tokio::test]
326    async fn test_uptime_gauge_updates() {
327        // Test that the uptime gauge is properly updated and increases over time
328        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
329            let drt = create_test_drt_async().await;
330
331            // Get initial uptime
332            let initial_uptime = drt.system_health.lock().unwrap().uptime();
333
334            // Update the gauge with initial value
335            drt.system_health.lock().unwrap().update_uptime_gauge();
336
337            // Sleep for 100ms
338            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
339
340            // Get uptime after sleep
341            let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
342
343            // Update the gauge again
344            drt.system_health.lock().unwrap().update_uptime_gauge();
345
346            // Verify uptime increased by at least 100ms
347            let elapsed = uptime_after_sleep - initial_uptime;
348            assert!(
349                elapsed >= std::time::Duration::from_millis(100),
350                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
351                elapsed
352            );
353        })
354        .await;
355    }
356
357    #[tokio::test]
358    async fn test_http_requests_fail_when_system_disabled() {
359        // Test that system status server is not running when disabled
360        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
361            let drt = create_test_drt_async().await;
362
363            // Verify that system status server info is None when disabled
364            let system_info = drt.system_status_server_info();
365            assert!(
366                system_info.is_none(),
367                "System status server should not be running when DYN_SYSTEM_ENABLED=false"
368            );
369
370            println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
371        })
372        .await;
373    }
374
375    /// This test verifies the health and liveness endpoints of the system status server.
376    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
377    /// based on the initial health status and any custom endpoint paths provided via environment variables.
378    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
379    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
380    /// and expected response codes and bodies.
381    #[rstest]
382    #[case("ready", 200, "ready", None, None, 3)]
383    #[case("notready", 503, "notready", None, None, 3)]
384    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
385    #[case(
386        "notready",
387        503,
388        "notready",
389        Some("/custom/health"),
390        Some("/custom/live"),
391        5
392    )]
393    #[tokio::test]
394    #[cfg(feature = "integration")]
395    async fn test_health_endpoints(
396        #[case] starting_health_status: &'static str,
397        #[case] expected_status: u16,
398        #[case] expected_body: &'static str,
399        #[case] custom_health_path: Option<&'static str>,
400        #[case] custom_live_path: Option<&'static str>,
401        #[case] expected_num_tests: usize,
402    ) {
403        use std::sync::Arc;
404        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
405        // use reqwest for HTTP requests
406
407        // Closure call is needed here to satisfy async_with_vars
408
409        crate::logging::init();
410
411        #[allow(clippy::redundant_closure_call)]
412        temp_env::async_with_vars(
413            [
414                ("DYN_SYSTEM_ENABLED", Some("true")),
415                ("DYN_SYSTEM_PORT", Some("0")),
416                (
417                    "DYN_SYSTEM_STARTING_HEALTH_STATUS",
418                    Some(starting_health_status),
419                ),
420                ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
421                ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
422            ],
423            (async || {
424                let drt = Arc::new(create_test_drt_async().await);
425
426                // Get system status server info from DRT (instead of manually spawning)
427                let system_info = drt
428                    .system_status_server_info()
429                    .expect("System status server should be started by DRT");
430                let addr = system_info.socket_addr;
431
432                let client = reqwest::Client::new();
433
434                // Prepare test cases
435                let mut test_cases = vec![];
436                match custom_health_path {
437                    None => {
438                        // When using default paths, test the default paths
439                        test_cases.push(("/health", expected_status, expected_body));
440                    }
441                    Some(chp) => {
442                        // When using custom paths, default paths should not exist
443                        test_cases.push(("/health", 404, "Route not found"));
444                        test_cases.push((chp, expected_status, expected_body));
445                    }
446                }
447                match custom_live_path {
448                    None => {
449                        // When using default paths, test the default paths
450                        test_cases.push(("/live", expected_status, expected_body));
451                    }
452                    Some(clp) => {
453                        // When using custom paths, default paths should not exist
454                        test_cases.push(("/live", 404, "Route not found"));
455                        test_cases.push((clp, expected_status, expected_body));
456                    }
457                }
458                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
459                assert_eq!(test_cases.len(), expected_num_tests);
460
461                for (path, expect_status, expect_body) in test_cases {
462                    println!("[test] Sending request to {}", path);
463                    let url = format!("http://{}{}", addr, path);
464                    let response = client.get(&url).send().await.unwrap();
465                    let status = response.status();
466                    let body = response.text().await.unwrap();
467                    println!(
468                        "[test] Response for {}: status={}, body={:?}",
469                        path, status, body
470                    );
471                    assert_eq!(
472                        status, expect_status,
473                        "Response: status={}, body={:?}",
474                        status, body
475                    );
476                    assert!(
477                        body.contains(expect_body),
478                        "Response: status={}, body={:?}",
479                        status,
480                        body
481                    );
482                }
483            })(),
484        )
485        .await;
486    }
487
488    #[tokio::test]
489    async fn test_health_endpoint_tracing() -> Result<()> {
490        use std::sync::Arc;
491
492        // Closure call is needed here to satisfy async_with_vars
493
494        #[allow(clippy::redundant_closure_call)]
495        let _ = temp_env::async_with_vars(
496            [
497                ("DYN_SYSTEM_ENABLED", Some("true")),
498                ("DYN_SYSTEM_PORT", Some("0")),
499                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
500                ("DYN_LOGGING_JSONL", Some("1")),
501                ("DYN_LOG", Some("trace")),
502            ],
503            (async || {
504                // TODO Add proper testing for
505                // trace id and parent id
506
507                crate::logging::init();
508
509                let drt = Arc::new(create_test_drt_async().await);
510
511                // Get system status server info from DRT (instead of manually spawning)
512                let system_info = drt
513                    .system_status_server_info()
514                    .expect("System status server should be started by DRT");
515                let addr = system_info.socket_addr;
516                let client = reqwest::Client::new();
517                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
518                    let traceparent_value =
519                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
520                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
521                    let mut headers = reqwest::header::HeaderMap::new();
522                    headers.insert(
523                        reqwest::header::HeaderName::from_static("traceparent"),
524                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
525                    );
526                    headers.insert(
527                        reqwest::header::HeaderName::from_static("tracestate"),
528                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
529                    );
530                    let url = format!("http://{}{}", addr, path);
531                    let response = client.get(&url).headers(headers).send().await.unwrap();
532                    let status = response.status();
533                    let body = response.text().await.unwrap();
534                    tracing::info!(body = body, status = status.to_string());
535                }
536
537                Ok::<(), anyhow::Error>(())
538            })(),
539        )
540        .await;
541        Ok(())
542    }
543
544    #[tokio::test]
545    async fn test_health_endpoint_with_changing_health_status() {
546        // Test health endpoint starts in not ready status, then becomes ready
547        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
548        const ENDPOINT_NAME: &str = "generate";
549        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
550        temp_env::async_with_vars(
551            [
552                ("DYN_SYSTEM_ENABLED", Some("true")),
553                ("DYN_SYSTEM_PORT", Some("0")),
554                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
555                ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
556            ],
557            async {
558                let drt = Arc::new(create_test_drt_async().await);
559
560                // Check if system status server was started
561                let system_info_opt = drt.system_status_server_info();
562
563                // Ensure system status server was spawned by DRT
564                assert!(
565                    system_info_opt.is_some(),
566                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
567                    std::env::var("DYN_SYSTEM_ENABLED"),
568                    std::env::var("DYN_SYSTEM_PORT")
569                );
570
571                // Get the system status server info from DRT - this should never fail now due to above check
572                let system_info = system_info_opt.unwrap();
573                let addr = system_info.socket_addr;
574
575                // Initially check health - should be not ready
576                let client = reqwest::Client::new();
577                let health_url = format!("http://{}/health", addr);
578
579                let response = client.get(&health_url).send().await.unwrap();
580                let status = response.status();
581                let body = response.text().await.unwrap();
582
583                // Health should be not ready (503) initially
584                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
585                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
586
587                // Now create a namespace, component, and endpoint to make the system healthy
588                let namespace = drt.namespace("ns1234").unwrap();
589                let component = namespace.component("comp1234").unwrap();
590
591                // Create a simple test handler
592                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
593                use crate::protocols::annotated::Annotated;
594
595                struct TestHandler;
596
597                #[async_trait]
598                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
599                    async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
600                        let (data, ctx) = input.into_parts();
601                        let response = Annotated::from_data(format!("You responded: {}", data));
602                        Ok(crate::pipeline::ResponseStream::new(
603                            Box::pin(crate::stream::iter(vec![response])),
604                            ctx.context()
605                        ))
606                    }
607                }
608
609                // Create the ingress and start the endpoint service
610                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
611
612                // Start the service and endpoint with a health check payload
613                // This will automatically register the endpoint for health monitoring
614                tokio::spawn(async move {
615                    let _ = component
616                        .service_builder()
617                        .create()
618                        .await
619                        .unwrap()
620                        .endpoint(ENDPOINT_NAME)
621                        .endpoint_builder()
622                        .handler(ingress)
623                        .health_check_payload(serde_json::json!({
624                            "test": "health_check"
625                        }))
626                        .start()
627                        .await;
628                });
629
630                // Hit health endpoint 200 times to verify consistency
631                let mut success_count = 0;
632                let mut failures = Vec::new();
633
634                for i in 1..=200 {
635                    let response = client.get(&health_url).send().await.unwrap();
636                    let status = response.status();
637                    let body = response.text().await.unwrap();
638
639                    if status == 200 && body.contains("\"status\":\"ready\"") {
640                        success_count += 1;
641                    } else {
642                        failures.push((i, status.as_u16(), body.clone()));
643                        if failures.len() <= 5 {  // Only log first 5 failures
644                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
645                        }
646                    }
647                }
648
649                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
650                if !failures.is_empty() {
651                    tracing::warn!("Failed requests: {}", failures.len());
652                }
653
654                // Expect at least 150 out of 200 requests to be successful
655                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
656            },
657        )
658        .await;
659    }
660
661    #[tokio::test]
662    async fn test_spawn_system_status_server_endpoints() {
663        // use reqwest for HTTP requests
664        temp_env::async_with_vars(
665            [
666                ("DYN_SYSTEM_ENABLED", Some("true")),
667                ("DYN_SYSTEM_PORT", Some("0")),
668                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
669            ],
670            async {
671                let drt = Arc::new(create_test_drt_async().await);
672
673                // Get system status server info from DRT (instead of manually spawning)
674                let system_info = drt
675                    .system_status_server_info()
676                    .expect("System status server should be started by DRT");
677                let addr = system_info.socket_addr;
678                let client = reqwest::Client::new();
679                for (path, expect_200, expect_body) in [
680                    ("/health", true, "ready"),
681                    ("/live", true, "ready"),
682                    ("/someRandomPathNotFoundHere", false, "Route not found"),
683                ] {
684                    println!("[test] Sending request to {}", path);
685                    let url = format!("http://{}{}", addr, path);
686                    let response = client.get(&url).send().await.unwrap();
687                    let status = response.status();
688                    let body = response.text().await.unwrap();
689                    println!(
690                        "[test] Response for {}: status={}, body={:?}",
691                        path, status, body
692                    );
693                    if expect_200 {
694                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
695                    } else {
696                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
697                    }
698                    assert!(
699                        body.contains(expect_body),
700                        "Response: status={}, body={:?}",
701                        status,
702                        body
703                    );
704                }
705                // DRT handles server cleanup automatically
706            },
707        )
708        .await;
709    }
710
711    #[cfg(feature = "integration")]
712    #[tokio::test]
713    async fn test_health_check_with_payload_and_timeout() {
714        // Test the complete health check flow with the new canary-based system:
715        crate::logging::init();
716
717        temp_env::async_with_vars(
718            [
719                ("DYN_SYSTEM_ENABLED", Some("true")),
720                ("DYN_SYSTEM_PORT", Some("0")),
721                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
722                (
723                    "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
724                    Some("[\"test.endpoint\"]"),
725                ),
726                // Enable health check with short intervals for testing
727                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
728                ("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
729                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
730                ("RUST_LOG", Some("info")),                      // Enable logging for test
731            ],
732            async {
733                let drt = Arc::new(create_test_drt_async().await);
734
735                // Get system status server info
736                let system_info = drt
737                    .system_status_server_info()
738                    .expect("System status server should be started");
739                let addr = system_info.socket_addr;
740
741                let client = reqwest::Client::new();
742                let health_url = format!("http://{}/health", addr);
743
744                // Register an endpoint with health check payload
745                let endpoint = "test.endpoint";
746                let health_check_payload = serde_json::json!({
747                    "prompt": "health check test",
748                    "_health_check": true
749                });
750
751                // Register the endpoint and its health check payload
752                {
753                    let system_health = drt.system_health.lock().unwrap();
754                    system_health.register_health_check_target(
755                        endpoint,
756                        crate::component::Instance {
757                            component: "test_component".to_string(),
758                            endpoint: "health".to_string(),
759                            namespace: "test_namespace".to_string(),
760                            instance_id: 1,
761                            transport: crate::component::TransportType::NatsTcp(
762                                endpoint.to_string(),
763                            ),
764                        },
765                        health_check_payload.clone(),
766                    );
767                }
768
769                // Check initial health - should be ready (default state)
770                let response = client.get(&health_url).send().await.unwrap();
771                let status = response.status();
772                let body = response.text().await.unwrap();
773                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
774                assert!(
775                    body.contains("\"status\":\"notready\""),
776                    "Should show notready status initially"
777                );
778
779                // Set endpoint to healthy state
780                drt.system_health
781                    .lock()
782                    .unwrap()
783                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
784
785                // Check health again - should now be healthy
786                let response = client.get(&health_url).send().await.unwrap();
787                let status = response.status();
788                let body = response.text().await.unwrap();
789
790                assert_eq!(status, 200, "Should be healthy due to recent response");
791                assert!(
792                    body.contains("\"status\":\"ready\""),
793                    "Should show ready status after response"
794                );
795
796                // Verify the endpoint status in SystemHealth directly
797                let endpoint_status = drt
798                    .system_health
799                    .lock()
800                    .unwrap()
801                    .get_endpoint_health_status(endpoint);
802                assert_eq!(
803                    endpoint_status,
804                    Some(HealthStatus::Ready),
805                    "SystemHealth should show endpoint as Ready after response"
806                );
807            },
808        )
809        .await;
810    }
811}