dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::config::HealthStatus;
5use crate::logging::make_request_span;
6use crate::metrics::MetricsHierarchy;
7use crate::metrics::prometheus_names::{nats_client, nats_service};
8use crate::traits::DistributedRuntimeProvider;
9use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
10use serde_json::json;
11use std::collections::HashMap;
12use std::sync::{Arc, OnceLock};
13use std::time::Instant;
14use tokio::{net::TcpListener, task::JoinHandle};
15use tokio_util::sync::CancellationToken;
16use tower_http::trace::TraceLayer;
17
18/// System status server information containing socket address and handle
19#[derive(Debug)]
20pub struct SystemStatusServerInfo {
21    pub socket_addr: std::net::SocketAddr,
22    pub handle: Option<Arc<JoinHandle<()>>>,
23}
24
25impl SystemStatusServerInfo {
26    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
27        Self {
28            socket_addr,
29            handle: handle.map(Arc::new),
30        }
31    }
32
33    pub fn address(&self) -> String {
34        self.socket_addr.to_string()
35    }
36
37    pub fn hostname(&self) -> String {
38        self.socket_addr.ip().to_string()
39    }
40
41    pub fn port(&self) -> u16 {
42        self.socket_addr.port()
43    }
44}
45
46impl Clone for SystemStatusServerInfo {
47    fn clone(&self) -> Self {
48        Self {
49            socket_addr: self.socket_addr,
50            handle: self.handle.clone(),
51        }
52    }
53}
54
55/// System status server state containing the distributed runtime reference
56pub struct SystemStatusState {
57    // global drt registry is for printing out the entire Prometheus format output
58    root_drt: Arc<crate::DistributedRuntime>,
59    // Discovery metadata (only for Kubernetes backend)
60    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
61}
62
63impl SystemStatusState {
64    /// Create new system status server state with the provided distributed runtime
65    pub fn new(
66        drt: Arc<crate::DistributedRuntime>,
67        discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
68    ) -> anyhow::Result<Self> {
69        Ok(Self {
70            root_drt: drt,
71            discovery_metadata,
72        })
73    }
74
75    /// Get a reference to the distributed runtime
76    pub fn drt(&self) -> &crate::DistributedRuntime {
77        &self.root_drt
78    }
79
80    /// Get a reference to the discovery metadata if available
81    pub fn discovery_metadata(
82        &self,
83    ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
84        self.discovery_metadata.as_ref()
85    }
86}
87
88/// Start system status server with metrics support
89pub async fn spawn_system_status_server(
90    host: &str,
91    port: u16,
92    cancel_token: CancellationToken,
93    drt: Arc<crate::DistributedRuntime>,
94    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
95) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
96    // Create system status server state with the provided distributed runtime
97    let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
98    let health_path = server_state
99        .drt()
100        .system_health()
101        .lock()
102        .health_path()
103        .to_string();
104    let live_path = server_state
105        .drt()
106        .system_health()
107        .lock()
108        .live_path()
109        .to_string();
110
111    let app = Router::new()
112        .route(
113            &health_path,
114            get({
115                let state = Arc::clone(&server_state);
116                move || health_handler(state)
117            }),
118        )
119        .route(
120            &live_path,
121            get({
122                let state = Arc::clone(&server_state);
123                move || health_handler(state)
124            }),
125        )
126        .route(
127            "/metrics",
128            get({
129                let state = Arc::clone(&server_state);
130                move || metrics_handler(state)
131            }),
132        )
133        .route(
134            "/metadata",
135            get({
136                let state = Arc::clone(&server_state);
137                move || metadata_handler(state)
138            }),
139        )
140        .fallback(|| async {
141            tracing::info!("[fallback handler] called");
142            (StatusCode::NOT_FOUND, "Route not found").into_response()
143        })
144        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
145
146    let address = format!("{}:{}", host, port);
147    tracing::info!("[spawn_system_status_server] binding to: {}", address);
148
149    let listener = match TcpListener::bind(&address).await {
150        Ok(listener) => {
151            // get the actual address and port, print in debug level
152            let actual_address = listener.local_addr()?;
153            tracing::info!(
154                "[spawn_system_status_server] system status server bound to: {}",
155                actual_address
156            );
157            (listener, actual_address)
158        }
159        Err(e) => {
160            tracing::error!("Failed to bind to address {}: {}", address, e);
161            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
162        }
163    };
164    let (listener, actual_address) = listener;
165
166    let observer = cancel_token.child_token();
167    // Spawn the server in the background and return the handle
168    let handle = tokio::spawn(async move {
169        if let Err(e) = axum::serve(listener, app)
170            .with_graceful_shutdown(observer.cancelled_owned())
171            .await
172        {
173            tracing::error!("System status server error: {}", e);
174        }
175    });
176
177    Ok((actual_address, handle))
178}
179
180/// Health handler with optional active health checking
181#[tracing::instrument(skip_all, level = "trace")]
182async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
183    // Get basic health status
184    let system_health = state.drt().system_health();
185    let system_health_lock = system_health.lock();
186    let (healthy, endpoints) = system_health_lock.get_health_status();
187    let uptime = Some(system_health_lock.uptime());
188    drop(system_health_lock);
189
190    let healthy_string = if healthy { "ready" } else { "notready" };
191    let status_code = if healthy {
192        StatusCode::OK
193    } else {
194        StatusCode::SERVICE_UNAVAILABLE
195    };
196
197    let response = json!({
198        "status": healthy_string,
199        "uptime": uptime,
200        "endpoints": endpoints,
201    });
202
203    tracing::trace!("Response {}", response.to_string());
204
205    (status_code, response.to_string())
206}
207
208/// Metrics handler with DistributedRuntime uptime
209#[tracing::instrument(skip_all, level = "trace")]
210async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
211    // Update the uptime gauge with current value
212    state.drt().system_health().lock().update_uptime_gauge();
213
214    // Get all metrics from DistributedRuntime
215    // Note: In the new hierarchy-based architecture, metrics are automatically registered
216    // at all parent levels, so DRT's metrics include all metrics from children
217    // (Namespace, Component, Endpoint). The prometheus_expfmt() method also executes
218    // all update callbacks and expfmt callbacks before returning the metrics.
219    let response = match state.drt().metrics().prometheus_expfmt() {
220        Ok(r) => r,
221        Err(e) => {
222            tracing::error!("Failed to get metrics from registry: {}", e);
223            return (
224                StatusCode::INTERNAL_SERVER_ERROR,
225                "Failed to get metrics".to_string(),
226            );
227        }
228    };
229
230    (StatusCode::OK, response)
231}
232
233/// Metadata handler
234#[tracing::instrument(skip_all, level = "trace")]
235async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
236    // Check if discovery metadata is available
237    let metadata = match state.discovery_metadata() {
238        Some(metadata) => metadata,
239        None => {
240            tracing::debug!("Metadata endpoint called but no discovery metadata available");
241            return (
242                StatusCode::NOT_FOUND,
243                "Discovery metadata not available".to_string(),
244            )
245                .into_response();
246        }
247    };
248
249    // Read the metadata
250    let metadata_guard = metadata.read().await;
251
252    // Serialize to JSON
253    match serde_json::to_string(&*metadata_guard) {
254        Ok(json) => {
255            tracing::trace!("Returning metadata: {} bytes", json.len());
256            (StatusCode::OK, json).into_response()
257        }
258        Err(e) => {
259            tracing::error!("Failed to serialize metadata: {}", e);
260            (
261                StatusCode::INTERNAL_SERVER_ERROR,
262                "Failed to serialize metadata".to_string(),
263            )
264                .into_response()
265        }
266    }
267}
268
269// Regular tests: cargo test system_status_server --lib
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use tokio::time::Duration;
274
275    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
276    #[tokio::test]
277    async fn test_http_server_lifecycle() {
278        let cancel_token = CancellationToken::new();
279        let cancel_token_for_server = cancel_token.clone();
280
281        // Test basic HTTP server lifecycle without DistributedRuntime
282        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
283
284        // start HTTP server
285        let server_handle = tokio::spawn(async move {
286            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
287            let _ = axum::serve(listener, app)
288                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
289                .await;
290        });
291
292        // server starts immediately, no need to wait
293
294        // cancel token
295        cancel_token.cancel();
296
297        // wait for the server to shut down
298        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
299        assert!(
300            result.is_ok(),
301            "HTTP server should shut down when cancel token is cancelled"
302        );
303    }
304}
305
306// Integration tests: cargo test system_status_server --lib --features integration
307#[cfg(all(test, feature = "integration"))]
308mod integration_tests {
309    use super::*;
310    use crate::distributed::distributed_test_utils::create_test_drt_async;
311    use crate::metrics::MetricsHierarchy;
312    use anyhow::Result;
313    use rstest::rstest;
314    use std::sync::Arc;
315    use tokio::time::Duration;
316
317    #[tokio::test]
318    async fn test_uptime_from_system_health() {
319        // Test that uptime is available from SystemHealth
320        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
321            let drt = create_test_drt_async().await;
322
323            // Get uptime from SystemHealth
324            let uptime = drt.system_health().lock().uptime();
325            // Uptime should exist (even if close to zero)
326            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
327
328            // Sleep briefly and check uptime increases
329            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
330            let uptime_after = drt.system_health().lock().uptime();
331            assert!(uptime_after > uptime);
332        })
333        .await;
334    }
335
336    #[tokio::test]
337    async fn test_runtime_metrics_initialization_and_namespace() {
338        // Test that metrics have correct namespace
339        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
340            let drt = create_test_drt_async().await;
341            // SystemStatusState is already created in distributed.rs
342            // so we don't need to create it again here
343
344            // The uptime_seconds metric should already be registered and available
345            let response = drt.metrics().prometheus_expfmt().unwrap();
346            println!("Full metrics response:\n{}", response);
347
348            // Filter out NATS client metrics for comparison
349            let filtered_response: String = response
350                .lines()
351                .filter(|line| {
352                    !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
353                })
354                .collect::<Vec<_>>()
355                .join("\n");
356
357            // Check that uptime_seconds metric is present with correct namespace
358            assert!(
359                filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
360                "Should contain uptime_seconds help text"
361            );
362            assert!(
363                filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
364                "Should contain uptime_seconds type"
365            );
366            assert!(
367                filtered_response.contains("dynamo_component_uptime_seconds"),
368                "Should contain uptime_seconds metric with correct namespace"
369            );
370        })
371        .await;
372    }
373
374    #[tokio::test]
375    async fn test_uptime_gauge_updates() {
376        // Test that the uptime gauge is properly updated and increases over time
377        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
378            let drt = create_test_drt_async().await;
379
380            // Get initial uptime
381            let initial_uptime = drt.system_health().lock().uptime();
382
383            // Update the gauge with initial value
384            drt.system_health().lock().update_uptime_gauge();
385
386            // Sleep for 100ms
387            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
388
389            // Get uptime after sleep
390            let uptime_after_sleep = drt.system_health().lock().uptime();
391
392            // Update the gauge again
393            drt.system_health().lock().update_uptime_gauge();
394
395            // Verify uptime increased by at least 100ms
396            let elapsed = uptime_after_sleep - initial_uptime;
397            assert!(
398                elapsed >= std::time::Duration::from_millis(100),
399                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
400                elapsed
401            );
402        })
403        .await;
404    }
405
406    #[tokio::test]
407    async fn test_http_requests_fail_when_system_disabled() {
408        // Test that system status server is not running when disabled
409        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
410            let drt = create_test_drt_async().await;
411
412            // Verify that system status server info is None when disabled
413            let system_info = drt.system_status_server_info();
414            assert!(
415                system_info.is_none(),
416                "System status server should not be running when disabled"
417            );
418
419            println!("✓ System status server correctly disabled when not enabled");
420        })
421        .await;
422    }
423
424    /// This test verifies the health and liveness endpoints of the system status server.
425    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
426    /// based on the initial health status and any custom endpoint paths provided via environment variables.
427    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
428    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
429    /// and expected response codes and bodies.
430    #[rstest]
431    #[case("ready", 200, "ready", None, None, 3)]
432    #[case("notready", 503, "notready", None, None, 3)]
433    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
434    #[case(
435        "notready",
436        503,
437        "notready",
438        Some("/custom/health"),
439        Some("/custom/live"),
440        5
441    )]
442    #[tokio::test]
443    #[cfg(feature = "integration")]
444    async fn test_health_endpoints(
445        #[case] starting_health_status: &'static str,
446        #[case] expected_status: u16,
447        #[case] expected_body: &'static str,
448        #[case] custom_health_path: Option<&'static str>,
449        #[case] custom_live_path: Option<&'static str>,
450        #[case] expected_num_tests: usize,
451    ) {
452        use std::sync::Arc;
453        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
454        // use reqwest for HTTP requests
455
456        // Closure call is needed here to satisfy async_with_vars
457
458        crate::logging::init();
459
460        #[allow(clippy::redundant_closure_call)]
461        temp_env::async_with_vars(
462            [
463                ("DYN_SYSTEM_PORT", Some("0")),
464                (
465                    "DYN_SYSTEM_STARTING_HEALTH_STATUS",
466                    Some(starting_health_status),
467                ),
468                ("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
469                ("DYN_SYSTEM_LIVE_PATH", custom_live_path),
470            ],
471            (async || {
472                let drt = Arc::new(create_test_drt_async().await);
473
474                // Get system status server info from DRT (instead of manually spawning)
475                let system_info = drt
476                    .system_status_server_info()
477                    .expect("System status server should be started by DRT");
478                let addr = system_info.socket_addr;
479
480                let client = reqwest::Client::new();
481
482                // Prepare test cases
483                let mut test_cases = vec![];
484                match custom_health_path {
485                    None => {
486                        // When using default paths, test the default paths
487                        test_cases.push(("/health", expected_status, expected_body));
488                    }
489                    Some(chp) => {
490                        // When using custom paths, default paths should not exist
491                        test_cases.push(("/health", 404, "Route not found"));
492                        test_cases.push((chp, expected_status, expected_body));
493                    }
494                }
495                match custom_live_path {
496                    None => {
497                        // When using default paths, test the default paths
498                        test_cases.push(("/live", expected_status, expected_body));
499                    }
500                    Some(clp) => {
501                        // When using custom paths, default paths should not exist
502                        test_cases.push(("/live", 404, "Route not found"));
503                        test_cases.push((clp, expected_status, expected_body));
504                    }
505                }
506                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
507                assert_eq!(test_cases.len(), expected_num_tests);
508
509                for (path, expect_status, expect_body) in test_cases {
510                    println!("[test] Sending request to {}", path);
511                    let url = format!("http://{}{}", addr, path);
512                    let response = client.get(&url).send().await.unwrap();
513                    let status = response.status();
514                    let body = response.text().await.unwrap();
515                    println!(
516                        "[test] Response for {}: status={}, body={:?}",
517                        path, status, body
518                    );
519                    assert_eq!(
520                        status, expect_status,
521                        "Response: status={}, body={:?}",
522                        status, body
523                    );
524                    assert!(
525                        body.contains(expect_body),
526                        "Response: status={}, body={:?}",
527                        status,
528                        body
529                    );
530                }
531            })(),
532        )
533        .await;
534    }
535
536    #[tokio::test]
537    async fn test_health_endpoint_tracing() -> Result<()> {
538        use std::sync::Arc;
539
540        // Closure call is needed here to satisfy async_with_vars
541
542        #[allow(clippy::redundant_closure_call)]
543        let _ = temp_env::async_with_vars(
544            [
545                ("DYN_SYSTEM_PORT", Some("0")),
546                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
547                ("DYN_LOGGING_JSONL", Some("1")),
548                ("DYN_LOG", Some("trace")),
549            ],
550            (async || {
551                // TODO Add proper testing for
552                // trace id and parent id
553
554                crate::logging::init();
555
556                let drt = Arc::new(create_test_drt_async().await);
557
558                // Get system status server info from DRT (instead of manually spawning)
559                let system_info = drt
560                    .system_status_server_info()
561                    .expect("System status server should be started by DRT");
562                let addr = system_info.socket_addr;
563                let client = reqwest::Client::new();
564                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
565                    let traceparent_value =
566                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
567                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
568                    let mut headers = reqwest::header::HeaderMap::new();
569                    headers.insert(
570                        reqwest::header::HeaderName::from_static("traceparent"),
571                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
572                    );
573                    headers.insert(
574                        reqwest::header::HeaderName::from_static("tracestate"),
575                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
576                    );
577                    let url = format!("http://{}{}", addr, path);
578                    let response = client.get(&url).headers(headers).send().await.unwrap();
579                    let status = response.status();
580                    let body = response.text().await.unwrap();
581                    tracing::info!(body = body, status = status.to_string());
582                }
583
584                Ok::<(), anyhow::Error>(())
585            })(),
586        )
587        .await;
588        Ok(())
589    }
590
591    #[tokio::test]
592    async fn test_health_endpoint_with_changing_health_status() {
593        // Test health endpoint starts in not ready status, then becomes ready
594        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
595        const ENDPOINT_NAME: &str = "generate";
596        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
597        temp_env::async_with_vars(
598            [
599                ("DYN_SYSTEM_PORT", Some("0")),
600                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
601                ("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
602            ],
603            async {
604                let drt = Arc::new(create_test_drt_async().await);
605
606                // Check if system status server was started
607                let system_info_opt = drt.system_status_server_info();
608
609                // Ensure system status server was spawned by DRT
610                assert!(
611                    system_info_opt.is_some(),
612                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
613                    std::env::var("DYN_SYSTEM_PORT")
614                );
615
616                // Get the system status server info from DRT - this should never fail now due to above check
617                let system_info = system_info_opt.unwrap();
618                let addr = system_info.socket_addr;
619
620                // Initially check health - should be not ready
621                let client = reqwest::Client::new();
622                let health_url = format!("http://{}/health", addr);
623
624                let response = client.get(&health_url).send().await.unwrap();
625                let status = response.status();
626                let body = response.text().await.unwrap();
627
628                // Health should be not ready (503) initially
629                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
630                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
631
632                // Now create a namespace, component, and endpoint to make the system healthy
633                let namespace = drt.namespace("ns1234").unwrap();
634                let mut component = namespace.component("comp1234").unwrap();
635
636                // Create a simple test handler
637                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
638                use crate::protocols::annotated::Annotated;
639
640                struct TestHandler;
641
642                #[async_trait]
643                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
644                    async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
645                        let (data, ctx) = input.into_parts();
646                        let response = Annotated::from_data(format!("You responded: {}", data));
647                        Ok(crate::pipeline::ResponseStream::new(
648                            Box::pin(crate::stream::iter(vec![response])),
649                            ctx.context()
650                        ))
651                    }
652                }
653
654                // Create the ingress and start the endpoint service
655                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
656
657                // Start the service and endpoint with a health check payload
658                // This will automatically register the endpoint for health monitoring
659                tokio::spawn(async move {
660                    component.add_stats_service().await.unwrap();
661                    let _ = component.endpoint(ENDPOINT_NAME)
662                        .endpoint_builder()
663                        .handler(ingress)
664                        .health_check_payload(serde_json::json!({
665                            "test": "health_check"
666                        }))
667                        .start()
668                        .await;
669                });
670
671                // Hit health endpoint 200 times to verify consistency
672                let mut success_count = 0;
673                let mut failures = Vec::new();
674
675                for i in 1..=200 {
676                    let response = client.get(&health_url).send().await.unwrap();
677                    let status = response.status();
678                    let body = response.text().await.unwrap();
679
680                    if status == 200 && body.contains("\"status\":\"ready\"") {
681                        success_count += 1;
682                    } else {
683                        failures.push((i, status.as_u16(), body.clone()));
684                        if failures.len() <= 5 {  // Only log first 5 failures
685                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
686                        }
687                    }
688                }
689
690                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
691                if !failures.is_empty() {
692                    tracing::warn!("Failed requests: {}", failures.len());
693                }
694
695                // Expect at least 150 out of 200 requests to be successful
696                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
697            },
698        )
699        .await;
700    }
701
702    #[tokio::test]
703    async fn test_spawn_system_status_server_endpoints() {
704        // use reqwest for HTTP requests
705        temp_env::async_with_vars(
706            [
707                ("DYN_SYSTEM_PORT", Some("0")),
708                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
709            ],
710            async {
711                let drt = Arc::new(create_test_drt_async().await);
712
713                // Get system status server info from DRT (instead of manually spawning)
714                let system_info = drt
715                    .system_status_server_info()
716                    .expect("System status server should be started by DRT");
717                let addr = system_info.socket_addr;
718                let client = reqwest::Client::new();
719                for (path, expect_200, expect_body) in [
720                    ("/health", true, "ready"),
721                    ("/live", true, "ready"),
722                    ("/someRandomPathNotFoundHere", false, "Route not found"),
723                ] {
724                    println!("[test] Sending request to {}", path);
725                    let url = format!("http://{}{}", addr, path);
726                    let response = client.get(&url).send().await.unwrap();
727                    let status = response.status();
728                    let body = response.text().await.unwrap();
729                    println!(
730                        "[test] Response for {}: status={}, body={:?}",
731                        path, status, body
732                    );
733                    if expect_200 {
734                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
735                    } else {
736                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
737                    }
738                    assert!(
739                        body.contains(expect_body),
740                        "Response: status={}, body={:?}",
741                        status,
742                        body
743                    );
744                }
745                // DRT handles server cleanup automatically
746            },
747        )
748        .await;
749    }
750
751    #[cfg(feature = "integration")]
752    #[tokio::test]
753    async fn test_health_check_with_payload_and_timeout() {
754        // Test the complete health check flow with the new canary-based system:
755        crate::logging::init();
756
757        temp_env::async_with_vars(
758            [
759                ("DYN_SYSTEM_PORT", Some("0")),
760                ("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
761                (
762                    "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
763                    Some("[\"test.endpoint\"]"),
764                ),
765                // Enable health check with short intervals for testing
766                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
767                ("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
768                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
769                ("RUST_LOG", Some("info")),                      // Enable logging for test
770            ],
771            async {
772                let drt = Arc::new(create_test_drt_async().await);
773
774                // Get system status server info
775                let system_info = drt
776                    .system_status_server_info()
777                    .expect("System status server should be started");
778                let addr = system_info.socket_addr;
779
780                let client = reqwest::Client::new();
781                let health_url = format!("http://{}/health", addr);
782
783                // Register an endpoint with health check payload
784                let endpoint = "test.endpoint";
785                let health_check_payload = serde_json::json!({
786                    "prompt": "health check test",
787                    "_health_check": true
788                });
789
790                // Register the endpoint and its health check payload
791                {
792                    let system_health = drt.system_health();
793                    let system_health_lock = system_health.lock();
794                    system_health_lock.register_health_check_target(
795                        endpoint,
796                        crate::component::Instance {
797                            component: "test_component".to_string(),
798                            endpoint: "health".to_string(),
799                            namespace: "test_namespace".to_string(),
800                            instance_id: 1,
801                            transport: crate::component::TransportType::Nats(endpoint.to_string()),
802                        },
803                        health_check_payload.clone(),
804                    );
805                }
806
807                // Check initial health - should be ready (default state)
808                let response = client.get(&health_url).send().await.unwrap();
809                let status = response.status();
810                let body = response.text().await.unwrap();
811                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
812                assert!(
813                    body.contains("\"status\":\"notready\""),
814                    "Should show notready status initially"
815                );
816
817                // Set endpoint to healthy state
818                drt.system_health()
819                    .lock()
820                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
821
822                // Check health again - should now be healthy
823                let response = client.get(&health_url).send().await.unwrap();
824                let status = response.status();
825                let body = response.text().await.unwrap();
826
827                assert_eq!(status, 200, "Should be healthy due to recent response");
828                assert!(
829                    body.contains("\"status\":\"ready\""),
830                    "Should show ready status after response"
831                );
832
833                // Verify the endpoint status in SystemHealth directly
834                let endpoint_status = drt
835                    .system_health()
836                    .lock()
837                    .get_endpoint_health_status(endpoint);
838                assert_eq!(
839                    endpoint_status,
840                    Some(HealthStatus::Ready),
841                    "SystemHealth should show endpoint as Ready after response"
842                );
843            },
844        )
845        .await;
846    }
847}