dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4// TODO: (DEP-635) this file should be renamed to system_http_server.rs
5//  it is being used not just for status, health, but others like loras management.
6
7use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15    Router,
16    body::Bytes,
17    extract::{Json, Path, State},
18    http::StatusCode,
19    response::IntoResponse,
20    routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32/// System status server information containing socket address and handle
33#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35    pub socket_addr: std::net::SocketAddr,
36    pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41        Self {
42            socket_addr,
43            handle: handle.map(Arc::new),
44        }
45    }
46
47    pub fn address(&self) -> String {
48        self.socket_addr.to_string()
49    }
50
51    pub fn hostname(&self) -> String {
52        self.socket_addr.ip().to_string()
53    }
54
55    pub fn port(&self) -> u16 {
56        self.socket_addr.port()
57    }
58}
59
60impl Clone for SystemStatusServerInfo {
61    fn clone(&self) -> Self {
62        Self {
63            socket_addr: self.socket_addr,
64            handle: self.handle.clone(),
65        }
66    }
67}
68
69/// System status server state containing the distributed runtime reference
70pub struct SystemStatusState {
71    // global drt registry is for printing out the entire Prometheus format output
72    root_drt: Arc<crate::DistributedRuntime>,
73    // Discovery metadata (only for Kubernetes backend)
74    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78    /// Create new system status server state with the provided distributed runtime
79    pub fn new(
80        drt: Arc<crate::DistributedRuntime>,
81        discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82    ) -> anyhow::Result<Self> {
83        Ok(Self {
84            root_drt: drt,
85            discovery_metadata,
86        })
87    }
88
89    /// Get a reference to the distributed runtime
90    pub fn drt(&self) -> &crate::DistributedRuntime {
91        &self.root_drt
92    }
93
94    /// Get a reference to the discovery metadata if available
95    pub fn discovery_metadata(
96        &self,
97    ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98        self.discovery_metadata.as_ref()
99    }
100}
101
102/// Request body for POST /v1/loras
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105    pub lora_name: String,
106    pub source: LoraSource,
107}
108
109/// Source information for loading a LoRA
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112    pub uri: String,
113}
114
115/// Response body for LoRA operations
116#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118    pub status: String,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub message: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub lora_name: Option<String>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub lora_id: Option<u64>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub loras: Option<serde_json::Value>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub count: Option<usize>,
129}
130
131/// Start system status server with metrics support
132pub async fn spawn_system_status_server(
133    host: &str,
134    port: u16,
135    cancel_token: CancellationToken,
136    drt: Arc<crate::DistributedRuntime>,
137    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139    // Create system status server state with the provided distributed runtime
140    let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141    let health_path = server_state
142        .drt()
143        .system_health()
144        .lock()
145        .health_path()
146        .to_string();
147    let live_path = server_state
148        .drt()
149        .system_health()
150        .lock()
151        .live_path()
152        .to_string();
153
154    // Check if LoRA feature is enabled
155    let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156        .map(|v| v.to_lowercase() == "true")
157        .unwrap_or(false);
158
159    let mut app = Router::new()
160        .route(
161            &health_path,
162            get({
163                let state = Arc::clone(&server_state);
164                move || health_handler(state)
165            }),
166        )
167        .route(
168            &live_path,
169            get({
170                let state = Arc::clone(&server_state);
171                move || health_handler(state)
172            }),
173        )
174        .route(
175            "/metrics",
176            get({
177                let state = Arc::clone(&server_state);
178                move || metrics_handler(state)
179            }),
180        )
181        .route(
182            "/metadata",
183            get({
184                let state = Arc::clone(&server_state);
185                move || metadata_handler(state)
186            }),
187        )
188        .route(
189            "/engine/{*path}",
190            any({
191                let state = Arc::clone(&server_state);
192                move |path, body| engine_route_handler(state, path, body)
193            }),
194        );
195
196    // Add LoRA routes only if DYN_LORA_ENABLED is set to true
197    if lora_enabled {
198        app = app
199            .route(
200                "/v1/loras",
201                get({
202                    let state = Arc::clone(&server_state);
203                    move || list_loras_handler(State(state))
204                })
205                .post({
206                    let state = Arc::clone(&server_state);
207                    move |body| load_lora_handler(State(state), body)
208                }),
209            )
210            .route(
211                "/v1/loras/{*lora_name}",
212                delete({
213                    let state = Arc::clone(&server_state);
214                    move |path| unload_lora_handler(State(state), path)
215                }),
216            );
217    }
218
219    let app = app
220        .fallback(|| async {
221            tracing::info!("[fallback handler] called");
222            (StatusCode::NOT_FOUND, "Route not found").into_response()
223        })
224        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226    let address = format!("{}:{}", host, port);
227    tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229    let listener = match TcpListener::bind(&address).await {
230        Ok(listener) => {
231            // get the actual address and port, print in debug level
232            let actual_address = listener.local_addr()?;
233            tracing::info!(
234                "[spawn_system_status_server] system status server bound to: {}",
235                actual_address
236            );
237            (listener, actual_address)
238        }
239        Err(e) => {
240            tracing::error!("Failed to bind to address {}: {}", address, e);
241            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242        }
243    };
244    let (listener, actual_address) = listener;
245
246    let observer = cancel_token.child_token();
247    // Spawn the server in the background and return the handle
248    let handle = tokio::spawn(async move {
249        if let Err(e) = axum::serve(listener, app)
250            .with_graceful_shutdown(observer.cancelled_owned())
251            .await
252        {
253            tracing::error!("System status server error: {}", e);
254        }
255    });
256
257    Ok((actual_address, handle))
258}
259
260/// Health handler with optional active health checking
261#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263    // Get basic health status
264    let system_health = state.drt().system_health();
265    let system_health_lock = system_health.lock();
266    let (healthy, endpoints) = system_health_lock.get_health_status();
267    let uptime = Some(system_health_lock.uptime());
268    drop(system_health_lock);
269
270    let healthy_string = if healthy { "ready" } else { "notready" };
271    let status_code = if healthy {
272        StatusCode::OK
273    } else {
274        StatusCode::SERVICE_UNAVAILABLE
275    };
276
277    let response = json!({
278        "status": healthy_string,
279        "uptime": uptime,
280        "endpoints": endpoints,
281    });
282
283    tracing::trace!("Response {}", response.to_string());
284
285    (status_code, response.to_string())
286}
287
288/// Metrics handler with DistributedRuntime uptime
289#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291    // Update the uptime gauge with current value
292    state.drt().system_health().lock().update_uptime_gauge();
293
294    // Get all metrics from DistributedRuntime
295    // Note: In the new hierarchy-based architecture, metrics are automatically registered
296    // at all parent levels, so DRT's metrics include all metrics from children
297    // (Namespace, Component, Endpoint). The prometheus_expfmt() method also executes
298    // all update callbacks and expfmt callbacks before returning the metrics.
299    let response = match state.drt().metrics().prometheus_expfmt() {
300        Ok(r) => r,
301        Err(e) => {
302            tracing::error!("Failed to get metrics from registry: {}", e);
303            return (
304                StatusCode::INTERNAL_SERVER_ERROR,
305                "Failed to get metrics".to_string(),
306            );
307        }
308    };
309
310    (StatusCode::OK, response)
311}
312
313/// Metadata handler
314#[tracing::instrument(skip_all, level = "trace")]
315async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
316    // Check if discovery metadata is available
317    let metadata = match state.discovery_metadata() {
318        Some(metadata) => metadata,
319        None => {
320            tracing::debug!("Metadata endpoint called but no discovery metadata available");
321            return (
322                StatusCode::NOT_FOUND,
323                "Discovery metadata not available".to_string(),
324            )
325                .into_response();
326        }
327    };
328
329    // Read the metadata
330    let metadata_guard = metadata.read().await;
331
332    // Serialize to JSON
333    match serde_json::to_string(&*metadata_guard) {
334        Ok(json) => {
335            tracing::trace!("Returning metadata: {} bytes", json.len());
336            (StatusCode::OK, json).into_response()
337        }
338        Err(e) => {
339            tracing::error!("Failed to serialize metadata: {}", e);
340            (
341                StatusCode::INTERNAL_SERVER_ERROR,
342                "Failed to serialize metadata".to_string(),
343            )
344                .into_response()
345        }
346    }
347}
348
349/// Handler for POST /v1/loras - Load a LoRA adapter
350#[tracing::instrument(skip_all, level = "debug")]
351async fn load_lora_handler(
352    State(state): State<Arc<SystemStatusState>>,
353    Json(request): Json<LoadLoraRequest>,
354) -> impl IntoResponse {
355    tracing::info!("Loading LoRA: {}", request.lora_name);
356
357    // Call the load_lora endpoint for each available backend
358    match call_lora_endpoint(
359        state.drt(),
360        "load_lora",
361        json!({
362            "lora_name": request.lora_name,
363            "source": {
364                "uri": request.source.uri
365            },
366        }),
367    )
368    .await
369    {
370        Ok(response) => {
371            tracing::info!("LoRA loaded successfully: {}", request.lora_name);
372            (StatusCode::OK, Json(response))
373        }
374        Err(e) => {
375            tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
376            (
377                StatusCode::INTERNAL_SERVER_ERROR,
378                Json(LoraResponse {
379                    status: "error".to_string(),
380                    message: Some(e.to_string()),
381                    lora_name: Some(request.lora_name),
382                    lora_id: None,
383                    loras: None,
384                    count: None,
385                }),
386            )
387        }
388    }
389}
390
391/// Handler for DELETE /v1/loras/*lora_name - Unload a LoRA adapter
392#[tracing::instrument(skip_all, level = "debug")]
393async fn unload_lora_handler(
394    State(state): State<Arc<SystemStatusState>>,
395    Path(lora_name): Path<String>,
396) -> impl IntoResponse {
397    // Strip the leading slash from the wildcard capture
398    let lora_name = lora_name
399        .strip_prefix('/')
400        .unwrap_or(&lora_name)
401        .to_string();
402    tracing::info!("Unloading LoRA: {}", lora_name);
403
404    // Call the unload_lora endpoint for each available backend
405    match call_lora_endpoint(
406        state.drt(),
407        "unload_lora",
408        json!({
409            "lora_name": lora_name.clone(),
410        }),
411    )
412    .await
413    {
414        Ok(response) => {
415            tracing::info!("LoRA unloaded successfully: {}", lora_name);
416            (StatusCode::OK, Json(response))
417        }
418        Err(e) => {
419            tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
420            (
421                StatusCode::INTERNAL_SERVER_ERROR,
422                Json(LoraResponse {
423                    status: "error".to_string(),
424                    message: Some(e.to_string()),
425                    lora_name: Some(lora_name),
426                    lora_id: None,
427                    loras: None,
428                    count: None,
429                }),
430            )
431        }
432    }
433}
434
435/// Handler for GET /v1/loras - List all LoRA adapters
436#[tracing::instrument(skip_all, level = "debug")]
437async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
438    tracing::info!("Listing all LoRAs");
439
440    // Call the list_loras endpoint for each available backend
441    match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
442        Ok(response) => {
443            tracing::info!("Successfully retrieved LoRA list");
444            (StatusCode::OK, Json(response))
445        }
446        Err(e) => {
447            tracing::error!("Failed to list LoRAs: {}", e);
448            (
449                StatusCode::INTERNAL_SERVER_ERROR,
450                Json(LoraResponse {
451                    status: "error".to_string(),
452                    message: Some(e.to_string()),
453                    lora_name: None,
454                    lora_id: None,
455                    loras: None,
456                    count: None,
457                }),
458            )
459        }
460    }
461}
462
463/// Helper function to call a LoRA management endpoint locally via in-process registry
464///
465/// This function ONLY uses the local endpoint registry for direct in-process calls.
466/// It does NOT fall back to network discovery if the endpoint is not found.
467async fn call_lora_endpoint(
468    drt: &crate::DistributedRuntime,
469    endpoint_name: &str,
470    request_body: serde_json::Value,
471) -> anyhow::Result<LoraResponse> {
472    use crate::engine::AsyncEngine;
473
474    tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
475
476    // Get the endpoint from the local registry (in-process call only)
477    let local_registry = drt.local_endpoint_registry();
478    let engine = local_registry
479        .get(endpoint_name)
480        .ok_or_else(|| {
481            anyhow::anyhow!(
482                "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
483                endpoint_name
484            )
485        })?;
486
487    tracing::debug!(
488        "Found endpoint '{}' in local registry, calling directly",
489        endpoint_name
490    );
491
492    // Call the engine directly without going through the network stack
493    let request = crate::pipeline::SingleIn::new(request_body);
494    let mut stream = engine.generate(request).await?;
495
496    // Get the first response
497    if let Some(response) = stream.next().await {
498        let response_data = response.data.unwrap_or_default();
499
500        // Try structured deserialization first, fall back to manual field extraction
501        let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
502            .unwrap_or_else(|_| parse_lora_response(&response_data));
503
504        return Ok(lora_response);
505    }
506
507    anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
508}
509
510/// Helper to parse response data into LoraResponse
511fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
512    LoraResponse {
513        status: response_data
514            .get("status")
515            .and_then(|s| s.as_str())
516            .unwrap_or("success")
517            .to_string(),
518        message: response_data
519            .get("message")
520            .and_then(|m| m.as_str())
521            .map(|s| s.to_string()),
522        lora_name: response_data
523            .get("lora_name")
524            .and_then(|n| n.as_str())
525            .map(|s| s.to_string()),
526        lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
527        loras: response_data.get("loras").cloned(),
528        count: response_data
529            .get("count")
530            .and_then(|c| c.as_u64())
531            .map(|c| c as usize),
532    }
533}
534
535/// Engine route handler for /engine/* routes
536///
537/// This handler looks up registered callbacks in the engine routes registry
538/// and invokes them with the request body, returning the response as JSON.
539#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
540async fn engine_route_handler(
541    state: Arc<SystemStatusState>,
542    Path(path): Path<String>,
543    body: Bytes,
544) -> impl IntoResponse {
545    tracing::trace!("Engine route request to /engine/{}", path);
546
547    // Parse body as JSON (empty object for GET/empty body)
548    let body_json: serde_json::Value = if body.is_empty() {
549        serde_json::json!({})
550    } else {
551        match serde_json::from_slice(&body) {
552            Ok(json) => json,
553            Err(e) => {
554                tracing::warn!("Invalid JSON in request body: {}", e);
555                return (
556                    StatusCode::BAD_REQUEST,
557                    json!({
558                        "error": "Invalid JSON",
559                        "message": format!("{}", e)
560                    })
561                    .to_string(),
562                )
563                    .into_response();
564            }
565        }
566    };
567
568    // Look up callback
569    let callback = match state.drt().engine_routes().get(&path) {
570        Some(cb) => cb,
571        None => {
572            tracing::debug!("Route /engine/{} not found", path);
573            return (
574                StatusCode::NOT_FOUND,
575                json!({
576                    "error": "Route not found",
577                    "message": format!("Route /engine/{} not found", path)
578                })
579                .to_string(),
580            )
581                .into_response();
582        }
583    };
584
585    // Call callback (it's async, so await it)
586    match callback(body_json).await {
587        Ok(response) => {
588            tracing::trace!("Engine route handler succeeded for /engine/{}", path);
589            (StatusCode::OK, response.to_string()).into_response()
590        }
591        Err(e) => {
592            tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
593            (
594                StatusCode::INTERNAL_SERVER_ERROR,
595                json!({
596                    "error": "Handler error",
597                    "message": format!("{}", e)
598                })
599                .to_string(),
600            )
601                .into_response()
602        }
603    }
604}
605
606// Regular tests: cargo test system_status_server --lib
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use tokio::time::Duration;
611
612    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
613    #[tokio::test]
614    async fn test_http_server_lifecycle() {
615        let cancel_token = CancellationToken::new();
616        let cancel_token_for_server = cancel_token.clone();
617
618        // Test basic HTTP server lifecycle without DistributedRuntime
619        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
620
621        // start HTTP server
622        let server_handle = tokio::spawn(async move {
623            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
624            let _ = axum::serve(listener, app)
625                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
626                .await;
627        });
628
629        // server starts immediately, no need to wait
630
631        // cancel token
632        cancel_token.cancel();
633
634        // wait for the server to shut down
635        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
636        assert!(
637            result.is_ok(),
638            "HTTP server should shut down when cancel token is cancelled"
639        );
640    }
641}
642
643// Integration tests: cargo test system_status_server --lib --features integration
644#[cfg(all(test, feature = "integration"))]
645mod integration_tests {
646    use super::*;
647    use crate::config::environment_names::logging as env_logging;
648    use crate::config::environment_names::runtime::canary as env_canary;
649    use crate::distributed::distributed_test_utils::create_test_drt_async;
650    use crate::metrics::MetricsHierarchy;
651    use anyhow::Result;
652    use rstest::rstest;
653    use std::sync::Arc;
654    use tokio::time::Duration;
655
656    #[tokio::test]
657    async fn test_uptime_from_system_health() {
658        // Test that uptime is available from SystemHealth
659        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
660            let drt = create_test_drt_async().await;
661
662            // Get uptime from SystemHealth
663            let uptime = drt.system_health().lock().uptime();
664            // Uptime should exist (even if close to zero)
665            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
666
667            // Sleep briefly and check uptime increases
668            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
669            let uptime_after = drt.system_health().lock().uptime();
670            assert!(uptime_after > uptime);
671        })
672        .await;
673    }
674
675    #[tokio::test]
676    async fn test_runtime_metrics_initialization_and_namespace() {
677        // Test that metrics have correct namespace
678        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
679            let drt = create_test_drt_async().await;
680            // SystemStatusState is already created in distributed.rs
681            // so we don't need to create it again here
682
683            // The uptime_seconds metric should already be registered and available
684            let response = drt.metrics().prometheus_expfmt().unwrap();
685            println!("Full metrics response:\n{}", response);
686
687            // Check that uptime_seconds metric is present with correct namespace
688            assert!(
689                response.contains("# HELP dynamo_component_uptime_seconds"),
690                "Should contain uptime_seconds help text"
691            );
692            assert!(
693                response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
694                "Should contain uptime_seconds type"
695            );
696            assert!(
697                response.contains("dynamo_component_uptime_seconds"),
698                "Should contain uptime_seconds metric with correct namespace"
699            );
700        })
701        .await;
702    }
703
704    #[tokio::test]
705    async fn test_uptime_gauge_updates() {
706        // Test that the uptime gauge is properly updated and increases over time
707        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
708            let drt = create_test_drt_async().await;
709
710            // Get initial uptime
711            let initial_uptime = drt.system_health().lock().uptime();
712
713            // Update the gauge with initial value
714            drt.system_health().lock().update_uptime_gauge();
715
716            // Sleep for 100ms
717            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
718
719            // Get uptime after sleep
720            let uptime_after_sleep = drt.system_health().lock().uptime();
721
722            // Update the gauge again
723            drt.system_health().lock().update_uptime_gauge();
724
725            // Verify uptime increased by at least 100ms
726            let elapsed = uptime_after_sleep - initial_uptime;
727            assert!(
728                elapsed >= std::time::Duration::from_millis(100),
729                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
730                elapsed
731            );
732        })
733        .await;
734    }
735
736    #[tokio::test]
737    async fn test_http_requests_fail_when_system_disabled() {
738        // Test that system status server is not running when disabled
739        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
740            let drt = create_test_drt_async().await;
741
742            // Verify that system status server info is None when disabled
743            let system_info = drt.system_status_server_info();
744            assert!(
745                system_info.is_none(),
746                "System status server should not be running when disabled"
747            );
748
749            println!("✓ System status server correctly disabled when not enabled");
750        })
751        .await;
752    }
753
754    /// This test verifies the health and liveness endpoints of the system status server.
755    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
756    /// based on the initial health status and any custom endpoint paths provided via environment variables.
757    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
758    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
759    /// and expected response codes and bodies.
760    #[rstest]
761    #[case("ready", 200, "ready", None, None, 3)]
762    #[case("notready", 503, "notready", None, None, 3)]
763    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
764    #[case(
765        "notready",
766        503,
767        "notready",
768        Some("/custom/health"),
769        Some("/custom/live"),
770        5
771    )]
772    #[tokio::test]
773    #[cfg(feature = "integration")]
774    async fn test_health_endpoints(
775        #[case] starting_health_status: &'static str,
776        #[case] expected_status: u16,
777        #[case] expected_body: &'static str,
778        #[case] custom_health_path: Option<&'static str>,
779        #[case] custom_live_path: Option<&'static str>,
780        #[case] expected_num_tests: usize,
781    ) {
782        use std::sync::Arc;
783        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
784        // use reqwest for HTTP requests
785
786        // Closure call is needed here to satisfy async_with_vars
787
788        crate::logging::init();
789
790        #[allow(clippy::redundant_closure_call)]
791        temp_env::async_with_vars(
792            [
793                (env_system::DYN_SYSTEM_PORT, Some("0")),
794                (
795                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
796                    Some(starting_health_status),
797                ),
798                (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
799                (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
800            ],
801            (async || {
802                let drt = Arc::new(create_test_drt_async().await);
803
804                // Get system status server info from DRT (instead of manually spawning)
805                let system_info = drt
806                    .system_status_server_info()
807                    .expect("System status server should be started by DRT");
808                let addr = system_info.socket_addr;
809
810                let client = reqwest::Client::new();
811
812                // Prepare test cases
813                let mut test_cases = vec![];
814                match custom_health_path {
815                    None => {
816                        // When using default paths, test the default paths
817                        test_cases.push(("/health", expected_status, expected_body));
818                    }
819                    Some(chp) => {
820                        // When using custom paths, default paths should not exist
821                        test_cases.push(("/health", 404, "Route not found"));
822                        test_cases.push((chp, expected_status, expected_body));
823                    }
824                }
825                match custom_live_path {
826                    None => {
827                        // When using default paths, test the default paths
828                        test_cases.push(("/live", expected_status, expected_body));
829                    }
830                    Some(clp) => {
831                        // When using custom paths, default paths should not exist
832                        test_cases.push(("/live", 404, "Route not found"));
833                        test_cases.push((clp, expected_status, expected_body));
834                    }
835                }
836                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
837                assert_eq!(test_cases.len(), expected_num_tests);
838
839                for (path, expect_status, expect_body) in test_cases {
840                    println!("[test] Sending request to {}", path);
841                    let url = format!("http://{}{}", addr, path);
842                    let response = client.get(&url).send().await.unwrap();
843                    let status = response.status();
844                    let body = response.text().await.unwrap();
845                    println!(
846                        "[test] Response for {}: status={}, body={:?}",
847                        path, status, body
848                    );
849                    assert_eq!(
850                        status, expect_status,
851                        "Response: status={}, body={:?}",
852                        status, body
853                    );
854                    assert!(
855                        body.contains(expect_body),
856                        "Response: status={}, body={:?}",
857                        status,
858                        body
859                    );
860                }
861            })(),
862        )
863        .await;
864    }
865
866    #[tokio::test]
867    async fn test_health_endpoint_tracing() -> Result<()> {
868        use std::sync::Arc;
869
870        // Closure call is needed here to satisfy async_with_vars
871
872        #[allow(clippy::redundant_closure_call)]
873        let _ = temp_env::async_with_vars(
874            [
875                (env_system::DYN_SYSTEM_PORT, Some("0")),
876                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
877                (env_logging::DYN_LOGGING_JSONL, Some("1")),
878                (env_logging::DYN_LOG, Some("trace")),
879            ],
880            (async || {
881                // TODO Add proper testing for
882                // trace id and parent id
883
884                crate::logging::init();
885
886                let drt = Arc::new(create_test_drt_async().await);
887
888                // Get system status server info from DRT (instead of manually spawning)
889                let system_info = drt
890                    .system_status_server_info()
891                    .expect("System status server should be started by DRT");
892                let addr = system_info.socket_addr;
893                let client = reqwest::Client::new();
894                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
895                    let traceparent_value =
896                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
897                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
898                    let mut headers = reqwest::header::HeaderMap::new();
899                    headers.insert(
900                        reqwest::header::HeaderName::from_static("traceparent"),
901                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
902                    );
903                    headers.insert(
904                        reqwest::header::HeaderName::from_static("tracestate"),
905                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
906                    );
907                    let url = format!("http://{}{}", addr, path);
908                    let response = client.get(&url).headers(headers).send().await.unwrap();
909                    let status = response.status();
910                    let body = response.text().await.unwrap();
911                    tracing::info!(body = body, status = status.to_string());
912                }
913
914                Ok::<(), anyhow::Error>(())
915            })(),
916        )
917        .await;
918        Ok(())
919    }
920
921    #[tokio::test]
922    async fn test_health_endpoint_with_changing_health_status() {
923        // Test health endpoint starts in not ready status, then becomes ready
924        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
925        const ENDPOINT_NAME: &str = "generate";
926        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
927        temp_env::async_with_vars(
928            [
929                (env_system::DYN_SYSTEM_PORT, Some("0")),
930                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
931                (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
932            ],
933            async {
934                let drt = Arc::new(create_test_drt_async().await);
935
936                // Check if system status server was started
937                let system_info_opt = drt.system_status_server_info();
938
939                // Ensure system status server was spawned by DRT
940                assert!(
941                    system_info_opt.is_some(),
942                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
943                    std::env::var(env_system::DYN_SYSTEM_PORT)
944                );
945
946                // Get the system status server info from DRT - this should never fail now due to above check
947                let system_info = system_info_opt.unwrap();
948                let addr = system_info.socket_addr;
949
950                // Initially check health - should be not ready
951                let client = reqwest::Client::new();
952                let health_url = format!("http://{}/health", addr);
953
954                let response = client.get(&health_url).send().await.unwrap();
955                let status = response.status();
956                let body = response.text().await.unwrap();
957
958                // Health should be not ready (503) initially
959                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
960                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
961
962                // Now create a namespace, component, and endpoint to make the system healthy
963                let namespace = drt.namespace("ns1234").unwrap();
964                let component = namespace.component("comp1234").unwrap();
965
966                // Create a simple test handler
967                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
968                use crate::protocols::annotated::Annotated;
969
970                struct TestHandler;
971
972                #[async_trait]
973                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
974                    async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
975                        let (data, ctx) = input.into_parts();
976                        let response = Annotated::from_data(format!("You responded: {}", data));
977                        Ok(crate::pipeline::ResponseStream::new(
978                            Box::pin(crate::stream::iter(vec![response])),
979                            ctx.context()
980                        ))
981                    }
982                }
983
984                // Create the ingress and start the endpoint service
985                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
986
987                // Start the service and endpoint with a health check payload
988                // This will automatically register the endpoint for health monitoring
989                tokio::spawn(async move {
990                    let _ = component.endpoint(ENDPOINT_NAME)
991                        .endpoint_builder()
992                        .handler(ingress)
993                        .health_check_payload(serde_json::json!({
994                            "test": "health_check"
995                        }))
996                        .start()
997                        .await;
998                });
999
1000                // Hit health endpoint 200 times to verify consistency
1001                let mut success_count = 0;
1002                let mut failures = Vec::new();
1003
1004                for i in 1..=200 {
1005                    let response = client.get(&health_url).send().await.unwrap();
1006                    let status = response.status();
1007                    let body = response.text().await.unwrap();
1008
1009                    if status == 200 && body.contains("\"status\":\"ready\"") {
1010                        success_count += 1;
1011                    } else {
1012                        failures.push((i, status.as_u16(), body.clone()));
1013                        if failures.len() <= 5 {  // Only log first 5 failures
1014                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
1015                        }
1016                    }
1017                }
1018
1019                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1020                if !failures.is_empty() {
1021                    tracing::warn!("Failed requests: {}", failures.len());
1022                }
1023
1024                // Expect at least 150 out of 200 requests to be successful
1025                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1026            },
1027        )
1028        .await;
1029    }
1030
1031    #[tokio::test]
1032    async fn test_spawn_system_status_server_endpoints() {
1033        // use reqwest for HTTP requests
1034        temp_env::async_with_vars(
1035            [
1036                (env_system::DYN_SYSTEM_PORT, Some("0")),
1037                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1038            ],
1039            async {
1040                let drt = Arc::new(create_test_drt_async().await);
1041
1042                // Get system status server info from DRT (instead of manually spawning)
1043                let system_info = drt
1044                    .system_status_server_info()
1045                    .expect("System status server should be started by DRT");
1046                let addr = system_info.socket_addr;
1047                let client = reqwest::Client::new();
1048                for (path, expect_200, expect_body) in [
1049                    ("/health", true, "ready"),
1050                    ("/live", true, "ready"),
1051                    ("/someRandomPathNotFoundHere", false, "Route not found"),
1052                ] {
1053                    println!("[test] Sending request to {}", path);
1054                    let url = format!("http://{}{}", addr, path);
1055                    let response = client.get(&url).send().await.unwrap();
1056                    let status = response.status();
1057                    let body = response.text().await.unwrap();
1058                    println!(
1059                        "[test] Response for {}: status={}, body={:?}",
1060                        path, status, body
1061                    );
1062                    if expect_200 {
1063                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1064                    } else {
1065                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1066                    }
1067                    assert!(
1068                        body.contains(expect_body),
1069                        "Response: status={}, body={:?}",
1070                        status,
1071                        body
1072                    );
1073                }
1074                // DRT handles server cleanup automatically
1075            },
1076        )
1077        .await;
1078    }
1079
1080    #[cfg(feature = "integration")]
1081    #[tokio::test]
1082    async fn test_health_check_with_payload_and_timeout() {
1083        // Test the complete health check flow with the new canary-based system:
1084        crate::logging::init();
1085
1086        temp_env::async_with_vars(
1087            [
1088                (env_system::DYN_SYSTEM_PORT, Some("0")),
1089                (
1090                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1091                    Some("notready"),
1092                ),
1093                (
1094                    env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1095                    Some("[\"test.endpoint\"]"),
1096                ),
1097                // Enable health check with short intervals for testing
1098                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1099                (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), // Send canary after 1 second of inactivity
1100                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
1101                ("RUST_LOG", Some("info")),                      // Enable logging for test
1102            ],
1103            async {
1104                let drt = Arc::new(create_test_drt_async().await);
1105
1106                // Get system status server info
1107                let system_info = drt
1108                    .system_status_server_info()
1109                    .expect("System status server should be started");
1110                let addr = system_info.socket_addr;
1111
1112                let client = reqwest::Client::new();
1113                let health_url = format!("http://{}/health", addr);
1114
1115                // Register an endpoint with health check payload
1116                let endpoint = "test.endpoint";
1117                let health_check_payload = serde_json::json!({
1118                    "prompt": "health check test",
1119                    "_health_check": true
1120                });
1121
1122                // Register the endpoint and its health check payload
1123                {
1124                    let system_health = drt.system_health();
1125                    let system_health_lock = system_health.lock();
1126                    system_health_lock.register_health_check_target(
1127                        endpoint,
1128                        crate::component::Instance {
1129                            component: "test_component".to_string(),
1130                            endpoint: "health".to_string(),
1131                            namespace: "test_namespace".to_string(),
1132                            instance_id: 1,
1133                            transport: crate::component::TransportType::Nats(endpoint.to_string()),
1134                        },
1135                        health_check_payload.clone(),
1136                    );
1137                }
1138
1139                // Check initial health - should be ready (default state)
1140                let response = client.get(&health_url).send().await.unwrap();
1141                let status = response.status();
1142                let body = response.text().await.unwrap();
1143                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1144                assert!(
1145                    body.contains("\"status\":\"notready\""),
1146                    "Should show notready status initially"
1147                );
1148
1149                // Set endpoint to healthy state
1150                drt.system_health()
1151                    .lock()
1152                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1153
1154                // Check health again - should now be healthy
1155                let response = client.get(&health_url).send().await.unwrap();
1156                let status = response.status();
1157                let body = response.text().await.unwrap();
1158
1159                assert_eq!(status, 200, "Should be healthy due to recent response");
1160                assert!(
1161                    body.contains("\"status\":\"ready\""),
1162                    "Should show ready status after response"
1163                );
1164
1165                // Verify the endpoint status in SystemHealth directly
1166                let endpoint_status = drt
1167                    .system_health()
1168                    .lock()
1169                    .get_endpoint_health_status(endpoint);
1170                assert_eq!(
1171                    endpoint_status,
1172                    Some(HealthStatus::Ready),
1173                    "SystemHealth should show endpoint as Ready after response"
1174                );
1175            },
1176        )
1177        .await;
1178    }
1179}