Skip to main content

dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4// TODO: (DEP-635) this file should be renamed to system_http_server.rs
5//  it is being used not just for status, health, but others like loras management.
6
7use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15    Router,
16    body::Bytes,
17    extract::{Json, Path, State},
18    http::StatusCode,
19    response::IntoResponse,
20    routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32/// System status server information containing socket address and handle
33#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35    pub socket_addr: std::net::SocketAddr,
36    pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41        Self {
42            socket_addr,
43            handle: handle.map(Arc::new),
44        }
45    }
46
47    pub fn address(&self) -> String {
48        self.socket_addr.to_string()
49    }
50
51    pub fn hostname(&self) -> String {
52        self.socket_addr.ip().to_string()
53    }
54
55    pub fn port(&self) -> u16 {
56        self.socket_addr.port()
57    }
58}
59
60impl Clone for SystemStatusServerInfo {
61    fn clone(&self) -> Self {
62        Self {
63            socket_addr: self.socket_addr,
64            handle: self.handle.clone(),
65        }
66    }
67}
68
69/// System status server state containing the distributed runtime reference
70pub struct SystemStatusState {
71    // global drt registry is for printing out the entire Prometheus format output
72    root_drt: Arc<crate::DistributedRuntime>,
73    // Discovery metadata (only for Kubernetes backend)
74    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78    /// Create new system status server state with the provided distributed runtime
79    pub fn new(
80        drt: Arc<crate::DistributedRuntime>,
81        discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82    ) -> anyhow::Result<Self> {
83        Ok(Self {
84            root_drt: drt,
85            discovery_metadata,
86        })
87    }
88
89    /// Get a reference to the distributed runtime
90    pub fn drt(&self) -> &crate::DistributedRuntime {
91        &self.root_drt
92    }
93
94    /// Get a reference to the discovery metadata if available
95    pub fn discovery_metadata(
96        &self,
97    ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98        self.discovery_metadata.as_ref()
99    }
100}
101
102/// Request body for POST /v1/loras
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105    pub lora_name: String,
106    pub source: LoraSource,
107}
108
109/// Source information for loading a LoRA
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112    pub uri: String,
113}
114
115/// Response body for LoRA operations
116#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118    pub status: String,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub message: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub lora_name: Option<String>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub lora_id: Option<u64>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub loras: Option<serde_json::Value>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub count: Option<usize>,
129}
130
131/// Start system status server with metrics support
132pub async fn spawn_system_status_server(
133    host: &str,
134    port: u16,
135    cancel_token: CancellationToken,
136    drt: Arc<crate::DistributedRuntime>,
137    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139    // Create system status server state with the provided distributed runtime
140    let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141    let health_path = server_state
142        .drt()
143        .system_health()
144        .lock()
145        .health_path()
146        .to_string();
147    let live_path = server_state
148        .drt()
149        .system_health()
150        .lock()
151        .live_path()
152        .to_string();
153
154    // Check if LoRA feature is enabled
155    let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156        .map(|v| v.to_lowercase() == "true")
157        .unwrap_or(false);
158
159    let mut app = Router::new()
160        .route(
161            &health_path,
162            get({
163                let state = Arc::clone(&server_state);
164                move || health_handler(state)
165            }),
166        )
167        .route(
168            &live_path,
169            get({
170                let state = Arc::clone(&server_state);
171                move || health_handler(state)
172            }),
173        )
174        .route(
175            "/metrics",
176            get({
177                let state = Arc::clone(&server_state);
178                move || metrics_handler(state)
179            }),
180        )
181        .route(
182            "/metadata",
183            get({
184                let state = Arc::clone(&server_state);
185                move || metadata_handler(state)
186            }),
187        )
188        .route(
189            "/engine/{*path}",
190            any({
191                let state = Arc::clone(&server_state);
192                move |path, body| engine_route_handler(state, path, body)
193            }),
194        );
195
196    // Add LoRA routes only if DYN_LORA_ENABLED is set to true
197    if lora_enabled {
198        app = app
199            .route(
200                "/v1/loras",
201                get({
202                    let state = Arc::clone(&server_state);
203                    move || list_loras_handler(State(state))
204                })
205                .post({
206                    let state = Arc::clone(&server_state);
207                    move |body| load_lora_handler(State(state), body)
208                }),
209            )
210            .route(
211                "/v1/loras/{*lora_name}",
212                delete({
213                    let state = Arc::clone(&server_state);
214                    move |path| unload_lora_handler(State(state), path)
215                }),
216            );
217    }
218
219    let app = app
220        .fallback(|| async {
221            tracing::info!("[fallback handler] called");
222            (StatusCode::NOT_FOUND, "Route not found").into_response()
223        })
224        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226    let address = format!("{}:{}", host, port);
227    tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229    let listener = match TcpListener::bind(&address).await {
230        Ok(listener) => {
231            // get the actual address and port, print in debug level
232            let actual_address = listener.local_addr()?;
233            tracing::info!(
234                "[spawn_system_status_server] system status server bound to: {}",
235                actual_address
236            );
237            (listener, actual_address)
238        }
239        Err(e) => {
240            tracing::error!("Failed to bind to address {}: {}", address, e);
241            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242        }
243    };
244    let (listener, actual_address) = listener;
245
246    let observer = cancel_token.child_token();
247    // Spawn the server in the background and return the handle
248    let handle = tokio::spawn(async move {
249        if let Err(e) = axum::serve(listener, app)
250            .with_graceful_shutdown(observer.cancelled_owned())
251            .await
252        {
253            tracing::error!("System status server error: {}", e);
254        }
255    });
256
257    Ok((actual_address, handle))
258}
259
260/// Health handler with optional active health checking
261#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263    // Get basic health status
264    let system_health = state.drt().system_health();
265    let system_health_lock = system_health.lock();
266    let (healthy, endpoints) = system_health_lock.get_health_status();
267    let uptime = Some(system_health_lock.uptime());
268    drop(system_health_lock);
269
270    let healthy_string = if healthy { "ready" } else { "notready" };
271    let status_code = if healthy {
272        StatusCode::OK
273    } else {
274        StatusCode::SERVICE_UNAVAILABLE
275    };
276
277    let response = json!({
278        "status": healthy_string,
279        "uptime": uptime,
280        "endpoints": endpoints,
281    });
282
283    tracing::trace!("Response {}", response.to_string());
284
285    (status_code, response.to_string())
286}
287
288/// Metrics handler with DistributedRuntime uptime
289#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291    // Update the uptime gauge with current value
292    state.drt().system_health().lock().update_uptime_gauge();
293
294    // Get all metrics from the DistributedRuntime.
295    //
296    // NOTE: We use a multi-registry model (e.g. one registry per endpoint) and merge at scrape time,
297    // so /metrics traverses registered child registries and produces a single combined output.
298    let response = match state.drt().metrics().prometheus_expfmt() {
299        Ok(r) => r,
300        Err(e) => {
301            tracing::error!("Failed to get metrics from registry: {}", e);
302            return (
303                StatusCode::INTERNAL_SERVER_ERROR,
304                "Failed to get metrics".to_string(),
305            );
306        }
307    };
308
309    (StatusCode::OK, response)
310}
311
312/// Metadata handler
313#[tracing::instrument(skip_all, level = "trace")]
314async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
315    // Check if discovery metadata is available
316    let metadata = match state.discovery_metadata() {
317        Some(metadata) => metadata,
318        None => {
319            tracing::debug!("Metadata endpoint called but no discovery metadata available");
320            return (
321                StatusCode::NOT_FOUND,
322                "Discovery metadata not available".to_string(),
323            )
324                .into_response();
325        }
326    };
327
328    // Read the metadata
329    let metadata_guard = metadata.read().await;
330
331    // Serialize to JSON
332    match serde_json::to_string(&*metadata_guard) {
333        Ok(json) => {
334            tracing::trace!("Returning metadata: {} bytes", json.len());
335            (StatusCode::OK, json).into_response()
336        }
337        Err(e) => {
338            tracing::error!("Failed to serialize metadata: {}", e);
339            (
340                StatusCode::INTERNAL_SERVER_ERROR,
341                "Failed to serialize metadata".to_string(),
342            )
343                .into_response()
344        }
345    }
346}
347
348/// Handler for POST /v1/loras - Load a LoRA adapter
349#[tracing::instrument(skip_all, level = "debug")]
350async fn load_lora_handler(
351    State(state): State<Arc<SystemStatusState>>,
352    Json(request): Json<LoadLoraRequest>,
353) -> impl IntoResponse {
354    tracing::info!("Loading LoRA: {}", request.lora_name);
355
356    // Call the load_lora endpoint for each available backend
357    match call_lora_endpoint(
358        state.drt(),
359        "load_lora",
360        json!({
361            "lora_name": request.lora_name,
362            "source": {
363                "uri": request.source.uri
364            },
365        }),
366    )
367    .await
368    {
369        Ok(response) => {
370            tracing::info!("LoRA loaded successfully: {}", request.lora_name);
371            (StatusCode::OK, Json(response))
372        }
373        Err(e) => {
374            tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
375            (
376                StatusCode::INTERNAL_SERVER_ERROR,
377                Json(LoraResponse {
378                    status: "error".to_string(),
379                    message: Some(e.to_string()),
380                    lora_name: Some(request.lora_name),
381                    lora_id: None,
382                    loras: None,
383                    count: None,
384                }),
385            )
386        }
387    }
388}
389
390/// Handler for DELETE /v1/loras/*lora_name - Unload a LoRA adapter
391#[tracing::instrument(skip_all, level = "debug")]
392async fn unload_lora_handler(
393    State(state): State<Arc<SystemStatusState>>,
394    Path(lora_name): Path<String>,
395) -> impl IntoResponse {
396    // Strip the leading slash from the wildcard capture
397    let lora_name = lora_name
398        .strip_prefix('/')
399        .unwrap_or(&lora_name)
400        .to_string();
401    tracing::info!("Unloading LoRA: {}", lora_name);
402
403    // Call the unload_lora endpoint for each available backend
404    match call_lora_endpoint(
405        state.drt(),
406        "unload_lora",
407        json!({
408            "lora_name": lora_name.clone(),
409        }),
410    )
411    .await
412    {
413        Ok(response) => {
414            tracing::info!("LoRA unloaded successfully: {}", lora_name);
415            (StatusCode::OK, Json(response))
416        }
417        Err(e) => {
418            tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
419            (
420                StatusCode::INTERNAL_SERVER_ERROR,
421                Json(LoraResponse {
422                    status: "error".to_string(),
423                    message: Some(e.to_string()),
424                    lora_name: Some(lora_name),
425                    lora_id: None,
426                    loras: None,
427                    count: None,
428                }),
429            )
430        }
431    }
432}
433
434/// Handler for GET /v1/loras - List all LoRA adapters
435#[tracing::instrument(skip_all, level = "debug")]
436async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
437    tracing::info!("Listing all LoRAs");
438
439    // Call the list_loras endpoint for each available backend
440    match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
441        Ok(response) => {
442            tracing::info!("Successfully retrieved LoRA list");
443            (StatusCode::OK, Json(response))
444        }
445        Err(e) => {
446            tracing::error!("Failed to list LoRAs: {}", e);
447            (
448                StatusCode::INTERNAL_SERVER_ERROR,
449                Json(LoraResponse {
450                    status: "error".to_string(),
451                    message: Some(e.to_string()),
452                    lora_name: None,
453                    lora_id: None,
454                    loras: None,
455                    count: None,
456                }),
457            )
458        }
459    }
460}
461
462/// Helper function to call a LoRA management endpoint locally via in-process registry
463///
464/// This function ONLY uses the local endpoint registry for direct in-process calls.
465/// It does NOT fall back to network discovery if the endpoint is not found.
466async fn call_lora_endpoint(
467    drt: &crate::DistributedRuntime,
468    endpoint_name: &str,
469    request_body: serde_json::Value,
470) -> anyhow::Result<LoraResponse> {
471    use crate::engine::AsyncEngine;
472
473    tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
474
475    // Get the endpoint from the local registry (in-process call only)
476    let local_registry = drt.local_endpoint_registry();
477    let engine = local_registry
478        .get(endpoint_name)
479        .ok_or_else(|| {
480            anyhow::anyhow!(
481                "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
482                endpoint_name
483            )
484        })?;
485
486    tracing::debug!(
487        "Found endpoint '{}' in local registry, calling directly",
488        endpoint_name
489    );
490
491    // Call the engine directly without going through the network stack
492    let request = crate::pipeline::SingleIn::new(request_body);
493    let mut stream = engine.generate(request).await?;
494
495    // Get the first response
496    if let Some(response) = stream.next().await {
497        let response_data = response.data.unwrap_or_default();
498
499        // Try structured deserialization first, fall back to manual field extraction
500        let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
501            .unwrap_or_else(|_| parse_lora_response(&response_data));
502
503        return Ok(lora_response);
504    }
505
506    anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
507}
508
509/// Helper to parse response data into LoraResponse
510fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
511    LoraResponse {
512        status: response_data
513            .get("status")
514            .and_then(|s| s.as_str())
515            .unwrap_or("success")
516            .to_string(),
517        message: response_data
518            .get("message")
519            .and_then(|m| m.as_str())
520            .map(|s| s.to_string()),
521        lora_name: response_data
522            .get("lora_name")
523            .and_then(|n| n.as_str())
524            .map(|s| s.to_string()),
525        lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
526        loras: response_data.get("loras").cloned(),
527        count: response_data
528            .get("count")
529            .and_then(|c| c.as_u64())
530            .map(|c| c as usize),
531    }
532}
533
534/// Engine route handler for /engine/* routes
535///
536/// This handler looks up registered callbacks in the engine routes registry
537/// and invokes them with the request body, returning the response as JSON.
538#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
539async fn engine_route_handler(
540    state: Arc<SystemStatusState>,
541    Path(path): Path<String>,
542    body: Bytes,
543) -> impl IntoResponse {
544    tracing::trace!("Engine route request to /engine/{}", path);
545
546    // Parse body as JSON (empty object for GET/empty body)
547    let body_json: serde_json::Value = if body.is_empty() {
548        serde_json::json!({})
549    } else {
550        match serde_json::from_slice(&body) {
551            Ok(json) => json,
552            Err(e) => {
553                tracing::warn!("Invalid JSON in request body: {}", e);
554                return (
555                    StatusCode::BAD_REQUEST,
556                    json!({
557                        "error": "Invalid JSON",
558                        "message": format!("{}", e)
559                    })
560                    .to_string(),
561                )
562                    .into_response();
563            }
564        }
565    };
566
567    // Look up callback
568    let callback = match state.drt().engine_routes().get(&path) {
569        Some(cb) => cb,
570        None => {
571            tracing::debug!("Route /engine/{} not found", path);
572            return (
573                StatusCode::NOT_FOUND,
574                json!({
575                    "error": "Route not found",
576                    "message": format!("Route /engine/{} not found", path)
577                })
578                .to_string(),
579            )
580                .into_response();
581        }
582    };
583
584    // Call callback (it's async, so await it)
585    match callback(body_json).await {
586        Ok(response) => {
587            tracing::trace!("Engine route handler succeeded for /engine/{}", path);
588            (StatusCode::OK, response.to_string()).into_response()
589        }
590        Err(e) => {
591            tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
592            (
593                StatusCode::INTERNAL_SERVER_ERROR,
594                json!({
595                    "error": "Handler error",
596                    "message": format!("{}", e)
597                })
598                .to_string(),
599            )
600                .into_response()
601        }
602    }
603}
604
605// Regular tests: cargo test system_status_server --lib
606#[cfg(test)]
607mod tests {
608    use super::*;
609    use tokio::time::Duration;
610
611    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
612    #[tokio::test]
613    async fn test_http_server_lifecycle() {
614        let cancel_token = CancellationToken::new();
615        let cancel_token_for_server = cancel_token.clone();
616
617        // Test basic HTTP server lifecycle without DistributedRuntime
618        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
619
620        // start HTTP server
621        let server_handle = tokio::spawn(async move {
622            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
623            let _ = axum::serve(listener, app)
624                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
625                .await;
626        });
627
628        // server starts immediately, no need to wait
629
630        // cancel token
631        cancel_token.cancel();
632
633        // wait for the server to shut down
634        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
635        assert!(
636            result.is_ok(),
637            "HTTP server should shut down when cancel token is cancelled"
638        );
639    }
640}
641
642// Integration tests: cargo test system_status_server --lib --features integration
643#[cfg(all(test, feature = "integration"))]
644mod integration_tests {
645    use super::*;
646    use crate::config::environment_names::logging as env_logging;
647    use crate::config::environment_names::runtime::canary as env_canary;
648    use crate::distributed::distributed_test_utils::create_test_drt_async;
649    use crate::metrics::MetricsHierarchy;
650    use anyhow::Result;
651    use rstest::rstest;
652    use std::sync::Arc;
653    use tokio::time::Duration;
654
655    #[tokio::test]
656    async fn test_uptime_from_system_health() {
657        // Test that uptime is available from SystemHealth
658        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
659            let drt = create_test_drt_async().await;
660
661            // Get uptime from SystemHealth
662            let uptime = drt.system_health().lock().uptime();
663            // Uptime should exist (even if close to zero)
664            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
665
666            // Sleep briefly and check uptime increases
667            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
668            let uptime_after = drt.system_health().lock().uptime();
669            assert!(uptime_after > uptime);
670        })
671        .await;
672    }
673
674    #[tokio::test]
675    async fn test_runtime_metrics_initialization_and_namespace() {
676        // Test that metrics have correct namespace
677        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
678            let drt = create_test_drt_async().await;
679            // SystemStatusState is already created in distributed.rs
680            // so we don't need to create it again here
681
682            // The uptime_seconds metric should already be registered and available
683            let response = drt.metrics().prometheus_expfmt().unwrap();
684            println!("Full metrics response:\n{}", response);
685
686            // Check that uptime_seconds metric is present with correct namespace
687            assert!(
688                response.contains("# HELP dynamo_component_uptime_seconds"),
689                "Should contain uptime_seconds help text"
690            );
691            assert!(
692                response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
693                "Should contain uptime_seconds type"
694            );
695            assert!(
696                response.contains("dynamo_component_uptime_seconds"),
697                "Should contain uptime_seconds metric with correct namespace"
698            );
699        })
700        .await;
701    }
702
703    #[tokio::test]
704    async fn test_uptime_gauge_updates() {
705        // Test that the uptime gauge is properly updated and increases over time
706        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
707            let drt = create_test_drt_async().await;
708
709            // Get initial uptime
710            let initial_uptime = drt.system_health().lock().uptime();
711
712            // Update the gauge with initial value
713            drt.system_health().lock().update_uptime_gauge();
714
715            // Sleep for 100ms
716            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
717
718            // Get uptime after sleep
719            let uptime_after_sleep = drt.system_health().lock().uptime();
720
721            // Update the gauge again
722            drt.system_health().lock().update_uptime_gauge();
723
724            // Verify uptime increased by at least 100ms
725            let elapsed = uptime_after_sleep - initial_uptime;
726            assert!(
727                elapsed >= std::time::Duration::from_millis(100),
728                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
729                elapsed
730            );
731        })
732        .await;
733    }
734
735    #[tokio::test]
736    async fn test_http_requests_fail_when_system_disabled() {
737        // Test that system status server is not running when disabled
738        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
739            let drt = create_test_drt_async().await;
740
741            // Verify that system status server info is None when disabled
742            let system_info = drt.system_status_server_info();
743            assert!(
744                system_info.is_none(),
745                "System status server should not be running when disabled"
746            );
747
748            println!("✓ System status server correctly disabled when not enabled");
749        })
750        .await;
751    }
752
753    /// This test verifies the health and liveness endpoints of the system status server.
754    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
755    /// based on the initial health status and any custom endpoint paths provided via environment variables.
756    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
757    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
758    /// and expected response codes and bodies.
759    #[rstest]
760    #[case("ready", 200, "ready", None, None, 3)]
761    #[case("notready", 503, "notready", None, None, 3)]
762    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
763    #[case(
764        "notready",
765        503,
766        "notready",
767        Some("/custom/health"),
768        Some("/custom/live"),
769        5
770    )]
771    #[tokio::test]
772    #[cfg(feature = "integration")]
773    async fn test_health_endpoints(
774        #[case] starting_health_status: &'static str,
775        #[case] expected_status: u16,
776        #[case] expected_body: &'static str,
777        #[case] custom_health_path: Option<&'static str>,
778        #[case] custom_live_path: Option<&'static str>,
779        #[case] expected_num_tests: usize,
780    ) {
781        use std::sync::Arc;
782        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
783        // use reqwest for HTTP requests
784
785        // Closure call is needed here to satisfy async_with_vars
786
787        crate::logging::init();
788
789        #[allow(clippy::redundant_closure_call)]
790        temp_env::async_with_vars(
791            [
792                (env_system::DYN_SYSTEM_PORT, Some("0")),
793                (
794                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
795                    Some(starting_health_status),
796                ),
797                (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
798                (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
799            ],
800            (async || {
801                let drt = Arc::new(create_test_drt_async().await);
802
803                // Get system status server info from DRT (instead of manually spawning)
804                let system_info = drt
805                    .system_status_server_info()
806                    .expect("System status server should be started by DRT");
807                let addr = system_info.socket_addr;
808
809                let client = reqwest::Client::new();
810
811                // Prepare test cases
812                let mut test_cases = vec![];
813                match custom_health_path {
814                    None => {
815                        // When using default paths, test the default paths
816                        test_cases.push(("/health", expected_status, expected_body));
817                    }
818                    Some(chp) => {
819                        // When using custom paths, default paths should not exist
820                        test_cases.push(("/health", 404, "Route not found"));
821                        test_cases.push((chp, expected_status, expected_body));
822                    }
823                }
824                match custom_live_path {
825                    None => {
826                        // When using default paths, test the default paths
827                        test_cases.push(("/live", expected_status, expected_body));
828                    }
829                    Some(clp) => {
830                        // When using custom paths, default paths should not exist
831                        test_cases.push(("/live", 404, "Route not found"));
832                        test_cases.push((clp, expected_status, expected_body));
833                    }
834                }
835                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
836                assert_eq!(test_cases.len(), expected_num_tests);
837
838                for (path, expect_status, expect_body) in test_cases {
839                    println!("[test] Sending request to {}", path);
840                    let url = format!("http://{}{}", addr, path);
841                    let response = client.get(&url).send().await.unwrap();
842                    let status = response.status();
843                    let body = response.text().await.unwrap();
844                    println!(
845                        "[test] Response for {}: status={}, body={:?}",
846                        path, status, body
847                    );
848                    assert_eq!(
849                        status, expect_status,
850                        "Response: status={}, body={:?}",
851                        status, body
852                    );
853                    assert!(
854                        body.contains(expect_body),
855                        "Response: status={}, body={:?}",
856                        status,
857                        body
858                    );
859                }
860            })(),
861        )
862        .await;
863    }
864
865    #[tokio::test]
866    async fn test_health_endpoint_tracing() -> Result<()> {
867        use std::sync::Arc;
868
869        // Closure call is needed here to satisfy async_with_vars
870
871        #[allow(clippy::redundant_closure_call)]
872        let _ = temp_env::async_with_vars(
873            [
874                (env_system::DYN_SYSTEM_PORT, Some("0")),
875                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
876                (env_logging::DYN_LOGGING_JSONL, Some("1")),
877                (env_logging::DYN_LOG, Some("trace")),
878            ],
879            (async || {
880                // TODO Add proper testing for
881                // trace id and parent id
882
883                crate::logging::init();
884
885                let drt = Arc::new(create_test_drt_async().await);
886
887                // Get system status server info from DRT (instead of manually spawning)
888                let system_info = drt
889                    .system_status_server_info()
890                    .expect("System status server should be started by DRT");
891                let addr = system_info.socket_addr;
892                let client = reqwest::Client::new();
893                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
894                    let traceparent_value =
895                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
896                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
897                    let mut headers = reqwest::header::HeaderMap::new();
898                    headers.insert(
899                        reqwest::header::HeaderName::from_static("traceparent"),
900                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
901                    );
902                    headers.insert(
903                        reqwest::header::HeaderName::from_static("tracestate"),
904                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
905                    );
906                    let url = format!("http://{}{}", addr, path);
907                    let response = client.get(&url).headers(headers).send().await.unwrap();
908                    let status = response.status();
909                    let body = response.text().await.unwrap();
910                    tracing::info!(body = body, status = status.to_string());
911                }
912
913                Ok::<(), anyhow::Error>(())
914            })(),
915        )
916        .await;
917        Ok(())
918    }
919
920    #[tokio::test]
921    async fn test_health_endpoint_with_changing_health_status() {
922        // Test health endpoint starts in not ready status, then becomes ready
923        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
924        const ENDPOINT_NAME: &str = "generate";
925        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
926        temp_env::async_with_vars(
927            [
928                (env_system::DYN_SYSTEM_PORT, Some("0")),
929                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
930                (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
931            ],
932            async {
933                let drt = Arc::new(create_test_drt_async().await);
934
935                // Check if system status server was started
936                let system_info_opt = drt.system_status_server_info();
937
938                // Ensure system status server was spawned by DRT
939                assert!(
940                    system_info_opt.is_some(),
941                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
942                    std::env::var(env_system::DYN_SYSTEM_PORT)
943                );
944
945                // Get the system status server info from DRT - this should never fail now due to above check
946                let system_info = system_info_opt.unwrap();
947                let addr = system_info.socket_addr;
948
949                // Initially check health - should be not ready
950                let client = reqwest::Client::new();
951                let health_url = format!("http://{}/health", addr);
952
953                let response = client.get(&health_url).send().await.unwrap();
954                let status = response.status();
955                let body = response.text().await.unwrap();
956
957                // Health should be not ready (503) initially
958                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
959                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
960
961                // Now create a namespace, component, and endpoint to make the system healthy
962                let namespace = drt.namespace("ns1234").unwrap();
963                let component = namespace.component("comp1234").unwrap();
964
965                // Create a simple test handler
966                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
967                use crate::protocols::annotated::Annotated;
968
969                struct TestHandler;
970
971                #[async_trait]
972                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
973                    async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
974                        let (data, ctx) = input.into_parts();
975                        let response = Annotated::from_data(format!("You responded: {}", data));
976                        Ok(crate::pipeline::ResponseStream::new(
977                            Box::pin(crate::stream::iter(vec![response])),
978                            ctx.context()
979                        ))
980                    }
981                }
982
983                // Create the ingress and start the endpoint service
984                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
985
986                // Start the service and endpoint with a health check payload
987                // This will automatically register the endpoint for health monitoring
988                tokio::spawn(async move {
989                    let _ = component.endpoint(ENDPOINT_NAME)
990                        .endpoint_builder()
991                        .handler(ingress)
992                        .health_check_payload(serde_json::json!({
993                            "test": "health_check"
994                        }))
995                        .start()
996                        .await;
997                });
998
999                // Hit health endpoint 200 times to verify consistency
1000                let mut success_count = 0;
1001                let mut failures = Vec::new();
1002
1003                for i in 1..=200 {
1004                    let response = client.get(&health_url).send().await.unwrap();
1005                    let status = response.status();
1006                    let body = response.text().await.unwrap();
1007
1008                    if status == 200 && body.contains("\"status\":\"ready\"") {
1009                        success_count += 1;
1010                    } else {
1011                        failures.push((i, status.as_u16(), body.clone()));
1012                        if failures.len() <= 5 {  // Only log first 5 failures
1013                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
1014                        }
1015                    }
1016                }
1017
1018                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1019                if !failures.is_empty() {
1020                    tracing::warn!("Failed requests: {}", failures.len());
1021                }
1022
1023                // Expect at least 150 out of 200 requests to be successful
1024                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1025            },
1026        )
1027        .await;
1028    }
1029
1030    #[tokio::test]
1031    async fn test_spawn_system_status_server_endpoints() {
1032        // use reqwest for HTTP requests
1033        temp_env::async_with_vars(
1034            [
1035                (env_system::DYN_SYSTEM_PORT, Some("0")),
1036                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1037            ],
1038            async {
1039                let drt = Arc::new(create_test_drt_async().await);
1040
1041                // Get system status server info from DRT (instead of manually spawning)
1042                let system_info = drt
1043                    .system_status_server_info()
1044                    .expect("System status server should be started by DRT");
1045                let addr = system_info.socket_addr;
1046                let client = reqwest::Client::new();
1047                for (path, expect_200, expect_body) in [
1048                    ("/health", true, "ready"),
1049                    ("/live", true, "ready"),
1050                    ("/someRandomPathNotFoundHere", false, "Route not found"),
1051                ] {
1052                    println!("[test] Sending request to {}", path);
1053                    let url = format!("http://{}{}", addr, path);
1054                    let response = client.get(&url).send().await.unwrap();
1055                    let status = response.status();
1056                    let body = response.text().await.unwrap();
1057                    println!(
1058                        "[test] Response for {}: status={}, body={:?}",
1059                        path, status, body
1060                    );
1061                    if expect_200 {
1062                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1063                    } else {
1064                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1065                    }
1066                    assert!(
1067                        body.contains(expect_body),
1068                        "Response: status={}, body={:?}",
1069                        status,
1070                        body
1071                    );
1072                }
1073                // DRT handles server cleanup automatically
1074            },
1075        )
1076        .await;
1077    }
1078
1079    #[cfg(feature = "integration")]
1080    #[tokio::test]
1081    async fn test_health_check_with_payload_and_timeout() {
1082        // Test the complete health check flow with the new canary-based system:
1083        crate::logging::init();
1084
1085        temp_env::async_with_vars(
1086            [
1087                (env_system::DYN_SYSTEM_PORT, Some("0")),
1088                (
1089                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1090                    Some("notready"),
1091                ),
1092                (
1093                    env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1094                    Some("[\"test.endpoint\"]"),
1095                ),
1096                // Enable health check with short intervals for testing
1097                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1098                (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), // Send canary after 1 second of inactivity
1099                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
1100                ("RUST_LOG", Some("info")),                      // Enable logging for test
1101            ],
1102            async {
1103                let drt = Arc::new(create_test_drt_async().await);
1104
1105                // Get system status server info
1106                let system_info = drt
1107                    .system_status_server_info()
1108                    .expect("System status server should be started");
1109                let addr = system_info.socket_addr;
1110
1111                let client = reqwest::Client::new();
1112                let health_url = format!("http://{}/health", addr);
1113
1114                // Register an endpoint with health check payload
1115                let endpoint = "test.endpoint";
1116                let health_check_payload = serde_json::json!({
1117                    "prompt": "health check test",
1118                    "_health_check": true
1119                });
1120
1121                // Register the endpoint and its health check payload
1122                {
1123                    let system_health = drt.system_health();
1124                    let system_health_lock = system_health.lock();
1125                    system_health_lock.register_health_check_target(
1126                        endpoint,
1127                        crate::component::Instance {
1128                            component: "test_component".to_string(),
1129                            endpoint: "health".to_string(),
1130                            namespace: "test_namespace".to_string(),
1131                            instance_id: 1,
1132                            transport: crate::component::TransportType::Nats(endpoint.to_string()),
1133                        },
1134                        health_check_payload.clone(),
1135                    );
1136                }
1137
1138                // Check initial health - should be ready (default state)
1139                let response = client.get(&health_url).send().await.unwrap();
1140                let status = response.status();
1141                let body = response.text().await.unwrap();
1142                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1143                assert!(
1144                    body.contains("\"status\":\"notready\""),
1145                    "Should show notready status initially"
1146                );
1147
1148                // Set endpoint to healthy state
1149                drt.system_health()
1150                    .lock()
1151                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1152
1153                // Check health again - should now be healthy
1154                let response = client.get(&health_url).send().await.unwrap();
1155                let status = response.status();
1156                let body = response.text().await.unwrap();
1157
1158                assert_eq!(status, 200, "Should be healthy due to recent response");
1159                assert!(
1160                    body.contains("\"status\":\"ready\""),
1161                    "Should show ready status after response"
1162                );
1163
1164                // Verify the endpoint status in SystemHealth directly
1165                let endpoint_status = drt
1166                    .system_health()
1167                    .lock()
1168                    .get_endpoint_health_status(endpoint);
1169                assert_eq!(
1170                    endpoint_status,
1171                    Some(HealthStatus::Ready),
1172                    "SystemHealth should show endpoint as Ready after response"
1173                );
1174            },
1175        )
1176        .await;
1177    }
1178}