Skip to main content

dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4// TODO: (DEP-635) this file should be renamed to system_http_server.rs
5//  it is being used not just for status, health, but others like loras management.
6
7use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15    Router,
16    body::Bytes,
17    extract::{Json, Path, State},
18    http::StatusCode,
19    response::IntoResponse,
20    routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32/// System status server information containing socket address and handle
33#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35    pub socket_addr: std::net::SocketAddr,
36    pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41        Self {
42            socket_addr,
43            handle: handle.map(Arc::new),
44        }
45    }
46
47    pub fn address(&self) -> String {
48        self.socket_addr.to_string()
49    }
50
51    pub fn hostname(&self) -> String {
52        self.socket_addr.ip().to_string()
53    }
54
55    pub fn port(&self) -> u16 {
56        self.socket_addr.port()
57    }
58}
59
60impl Clone for SystemStatusServerInfo {
61    fn clone(&self) -> Self {
62        Self {
63            socket_addr: self.socket_addr,
64            handle: self.handle.clone(),
65        }
66    }
67}
68
69/// System status server state containing the distributed runtime reference
70pub struct SystemStatusState {
71    // global drt registry is for printing out the entire Prometheus format output
72    root_drt: Arc<crate::DistributedRuntime>,
73    // Discovery metadata (only for Kubernetes backend)
74    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78    /// Create new system status server state with the provided distributed runtime
79    pub fn new(
80        drt: Arc<crate::DistributedRuntime>,
81        discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82    ) -> anyhow::Result<Self> {
83        Ok(Self {
84            root_drt: drt,
85            discovery_metadata,
86        })
87    }
88
89    /// Get a reference to the distributed runtime
90    pub fn drt(&self) -> &crate::DistributedRuntime {
91        &self.root_drt
92    }
93
94    /// Get a reference to the discovery metadata if available
95    pub fn discovery_metadata(
96        &self,
97    ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98        self.discovery_metadata.as_ref()
99    }
100}
101
102/// Request body for POST /v1/loras
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105    pub lora_name: String,
106    pub source: LoraSource,
107}
108
109/// Source information for loading a LoRA
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112    pub uri: String,
113}
114
115/// Response body for LoRA operations
116#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118    pub status: String,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub message: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub lora_name: Option<String>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub lora_id: Option<u64>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub loras: Option<serde_json::Value>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub count: Option<usize>,
129}
130
131/// Start system status server with metrics support
132pub async fn spawn_system_status_server(
133    host: &str,
134    port: u16,
135    cancel_token: CancellationToken,
136    drt: Arc<crate::DistributedRuntime>,
137    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139    // Create system status server state with the provided distributed runtime
140    let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141    let health_path = server_state
142        .drt()
143        .system_health()
144        .lock()
145        .health_path()
146        .to_string();
147    let live_path = server_state
148        .drt()
149        .system_health()
150        .lock()
151        .live_path()
152        .to_string();
153
154    // Check if LoRA feature is enabled
155    let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156        .map(|v| v.to_lowercase() == "true")
157        .unwrap_or(false);
158
159    let mut app = Router::new()
160        .route(
161            &health_path,
162            get({
163                let state = Arc::clone(&server_state);
164                move || health_handler(state)
165            }),
166        )
167        .route(
168            &live_path,
169            get({
170                let state = Arc::clone(&server_state);
171                move || health_handler(state)
172            }),
173        )
174        .route(
175            "/metrics",
176            get({
177                let state = Arc::clone(&server_state);
178                move || metrics_handler(state)
179            }),
180        )
181        .route(
182            "/metadata",
183            get({
184                let state = Arc::clone(&server_state);
185                move || metadata_handler(state)
186            }),
187        )
188        .route(
189            "/engine/{*path}",
190            any({
191                let state = Arc::clone(&server_state);
192                move |path, body| engine_route_handler(state, path, body)
193            }),
194        );
195
196    // Add LoRA routes only if DYN_LORA_ENABLED is set to true
197    if lora_enabled {
198        app = app
199            .route(
200                "/v1/loras",
201                get({
202                    let state = Arc::clone(&server_state);
203                    move || list_loras_handler(State(state))
204                })
205                .post({
206                    let state = Arc::clone(&server_state);
207                    move |body| load_lora_handler(State(state), body)
208                }),
209            )
210            .route(
211                "/v1/loras/{*lora_name}",
212                delete({
213                    let state = Arc::clone(&server_state);
214                    move |path| unload_lora_handler(State(state), path)
215                }),
216            );
217    }
218
219    let app = app
220        .fallback(|| async {
221            tracing::info!("[fallback handler] called");
222            (StatusCode::NOT_FOUND, "Route not found").into_response()
223        })
224        .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226    let address = format!("{}:{}", host, port);
227    tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229    let listener = match TcpListener::bind(&address).await {
230        Ok(listener) => {
231            // get the actual address and port, print in debug level
232            let actual_address = listener.local_addr()?;
233            tracing::info!(
234                "[spawn_system_status_server] system status server bound to: {}",
235                actual_address
236            );
237            (listener, actual_address)
238        }
239        Err(e) => {
240            tracing::error!("Failed to bind to address {}: {}", address, e);
241            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242        }
243    };
244    let (listener, actual_address) = listener;
245
246    let observer = cancel_token.child_token();
247    // Spawn the server in the background and return the handle
248    let handle = tokio::spawn(async move {
249        if let Err(e) = axum::serve(listener, app)
250            .with_graceful_shutdown(observer.cancelled_owned())
251            .await
252        {
253            tracing::error!("System status server error: {}", e);
254        }
255    });
256
257    Ok((actual_address, handle))
258}
259
260/// Health handler with optional active health checking
261#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263    // Get basic health status
264    let system_health = state.drt().system_health();
265    let system_health_lock = system_health.lock();
266    let (healthy, endpoints) = system_health_lock.get_health_status();
267    let uptime = Some(system_health_lock.uptime());
268    drop(system_health_lock);
269
270    let healthy_string = if healthy { "ready" } else { "notready" };
271    let status_code = if healthy {
272        StatusCode::OK
273    } else {
274        StatusCode::SERVICE_UNAVAILABLE
275    };
276
277    let response = json!({
278        "status": healthy_string,
279        "uptime": uptime,
280        "endpoints": endpoints,
281    });
282
283    tracing::trace!("Response {}", response.to_string());
284
285    (status_code, response.to_string())
286}
287
288/// Metrics handler with DistributedRuntime uptime
289#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291    // Get all metrics from the DistributedRuntime.
292    // The uptime gauge is updated automatically via a PrometheusUpdateCallback
293    // registered in DistributedRuntime::new(), so it is always fresh before scrape.
294    //
295    // NOTE: We use a multi-registry model (e.g. one registry per endpoint) and merge at scrape time,
296    // so /metrics traverses registered child registries and produces a single combined output.
297    let response = match state.drt().metrics().prometheus_expfmt() {
298        Ok(r) => r,
299        Err(e) => {
300            tracing::error!("Failed to get metrics from registry: {}", e);
301            return (
302                StatusCode::INTERNAL_SERVER_ERROR,
303                "Failed to get metrics".to_string(),
304            );
305        }
306    };
307
308    (StatusCode::OK, response)
309}
310
311/// Metadata handler
312#[tracing::instrument(skip_all, level = "trace")]
313async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
314    // Check if discovery metadata is available
315    let metadata = match state.discovery_metadata() {
316        Some(metadata) => metadata,
317        None => {
318            tracing::debug!("Metadata endpoint called but no discovery metadata available");
319            return (
320                StatusCode::NOT_FOUND,
321                "Discovery metadata not available".to_string(),
322            )
323                .into_response();
324        }
325    };
326
327    // Read the metadata
328    let metadata_guard = metadata.read().await;
329
330    // Serialize to JSON
331    match serde_json::to_string(&*metadata_guard) {
332        Ok(json) => {
333            tracing::trace!("Returning metadata: {} bytes", json.len());
334            (StatusCode::OK, json).into_response()
335        }
336        Err(e) => {
337            tracing::error!("Failed to serialize metadata: {}", e);
338            (
339                StatusCode::INTERNAL_SERVER_ERROR,
340                "Failed to serialize metadata".to_string(),
341            )
342                .into_response()
343        }
344    }
345}
346
347/// Handler for POST /v1/loras - Load a LoRA adapter
348#[tracing::instrument(skip_all, level = "debug")]
349async fn load_lora_handler(
350    State(state): State<Arc<SystemStatusState>>,
351    Json(request): Json<LoadLoraRequest>,
352) -> impl IntoResponse {
353    tracing::info!("Loading LoRA: {}", request.lora_name);
354
355    // Call the load_lora endpoint for each available backend
356    match call_lora_endpoint(
357        state.drt(),
358        "load_lora",
359        json!({
360            "lora_name": request.lora_name,
361            "source": {
362                "uri": request.source.uri
363            },
364        }),
365    )
366    .await
367    {
368        Ok(response) => {
369            if response.status == "error" {
370                tracing::error!(
371                    "Failed to load LoRA {}: {}",
372                    request.lora_name,
373                    response.message.as_deref().unwrap_or("Unknown error")
374                );
375                (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
376            } else {
377                tracing::info!("LoRA loaded successfully: {}", request.lora_name);
378                (StatusCode::OK, Json(response))
379            }
380        }
381        Err(e) => {
382            tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
383            (
384                StatusCode::INTERNAL_SERVER_ERROR,
385                Json(LoraResponse {
386                    status: "error".to_string(),
387                    message: Some(e.to_string()),
388                    lora_name: Some(request.lora_name),
389                    lora_id: None,
390                    loras: None,
391                    count: None,
392                }),
393            )
394        }
395    }
396}
397
398/// Handler for DELETE /v1/loras/*lora_name - Unload a LoRA adapter
399#[tracing::instrument(skip_all, level = "debug")]
400async fn unload_lora_handler(
401    State(state): State<Arc<SystemStatusState>>,
402    Path(lora_name): Path<String>,
403) -> impl IntoResponse {
404    // Strip the leading slash from the wildcard capture
405    let lora_name = lora_name
406        .strip_prefix('/')
407        .unwrap_or(&lora_name)
408        .to_string();
409    tracing::info!("Unloading LoRA: {}", lora_name);
410
411    // Call the unload_lora endpoint for each available backend
412    match call_lora_endpoint(
413        state.drt(),
414        "unload_lora",
415        json!({
416            "lora_name": lora_name.clone(),
417        }),
418    )
419    .await
420    {
421        Ok(response) => {
422            if response.status == "error" {
423                tracing::error!(
424                    "Failed to unload LoRA {}: {}",
425                    lora_name,
426                    response.message.as_deref().unwrap_or("Unknown error")
427                );
428                (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
429            } else {
430                tracing::info!("LoRA unloaded successfully: {}", lora_name);
431                (StatusCode::OK, Json(response))
432            }
433        }
434        Err(e) => {
435            tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
436            (
437                StatusCode::INTERNAL_SERVER_ERROR,
438                Json(LoraResponse {
439                    status: "error".to_string(),
440                    message: Some(e.to_string()),
441                    lora_name: Some(lora_name),
442                    lora_id: None,
443                    loras: None,
444                    count: None,
445                }),
446            )
447        }
448    }
449}
450
451/// Handler for GET /v1/loras - List all LoRA adapters
452#[tracing::instrument(skip_all, level = "debug")]
453async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
454    tracing::info!("Listing all LoRAs");
455
456    // Call the list_loras endpoint for each available backend
457    match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
458        Ok(response) => {
459            tracing::info!("Successfully retrieved LoRA list");
460            (StatusCode::OK, Json(response))
461        }
462        Err(e) => {
463            tracing::error!("Failed to list LoRAs: {}", e);
464            (
465                StatusCode::INTERNAL_SERVER_ERROR,
466                Json(LoraResponse {
467                    status: "error".to_string(),
468                    message: Some(e.to_string()),
469                    lora_name: None,
470                    lora_id: None,
471                    loras: None,
472                    count: None,
473                }),
474            )
475        }
476    }
477}
478
479/// Helper function to call a LoRA management endpoint locally via in-process registry
480///
481/// This function ONLY uses the local endpoint registry for direct in-process calls.
482/// It does NOT fall back to network discovery if the endpoint is not found.
483async fn call_lora_endpoint(
484    drt: &crate::DistributedRuntime,
485    endpoint_name: &str,
486    request_body: serde_json::Value,
487) -> anyhow::Result<LoraResponse> {
488    use crate::engine::AsyncEngine;
489
490    tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
491
492    // Get the endpoint from the local registry (in-process call only)
493    let local_registry = drt.local_endpoint_registry();
494    let engine = local_registry
495        .get(endpoint_name)
496        .ok_or_else(|| {
497            anyhow::anyhow!(
498                "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
499                endpoint_name
500            )
501        })?;
502
503    tracing::debug!(
504        "Found endpoint '{}' in local registry, calling directly",
505        endpoint_name
506    );
507
508    // Call the engine directly without going through the network stack
509    let request = crate::pipeline::SingleIn::new(request_body);
510    let mut stream = engine.generate(request).await?;
511
512    // Get the first response
513    if let Some(response) = stream.next().await {
514        let response_data = response.data.unwrap_or_default();
515
516        // Try structured deserialization first, fall back to manual field extraction
517        let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
518            .unwrap_or_else(|_| parse_lora_response(&response_data));
519
520        return Ok(lora_response);
521    }
522
523    anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
524}
525
526/// Helper to parse response data into LoraResponse
527fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
528    LoraResponse {
529        status: response_data
530            .get("status")
531            .and_then(|s| s.as_str())
532            .unwrap_or("success")
533            .to_string(),
534        message: response_data
535            .get("message")
536            .and_then(|m| m.as_str())
537            .map(|s| s.to_string()),
538        lora_name: response_data
539            .get("lora_name")
540            .and_then(|n| n.as_str())
541            .map(|s| s.to_string()),
542        lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
543        loras: response_data.get("loras").cloned(),
544        count: response_data
545            .get("count")
546            .and_then(|c| c.as_u64())
547            .map(|c| c as usize),
548    }
549}
550
551/// Engine route handler for /engine/* routes
552///
553/// This handler looks up registered callbacks in the engine routes registry
554/// and invokes them with the request body, returning the response as JSON.
555#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
556async fn engine_route_handler(
557    state: Arc<SystemStatusState>,
558    Path(path): Path<String>,
559    body: Bytes,
560) -> impl IntoResponse {
561    tracing::trace!("Engine route request to /engine/{}", path);
562
563    // Parse body as JSON (empty object for GET/empty body)
564    let body_json: serde_json::Value = if body.is_empty() {
565        serde_json::json!({})
566    } else {
567        match serde_json::from_slice(&body) {
568            Ok(json) => json,
569            Err(e) => {
570                tracing::warn!("Invalid JSON in request body: {}", e);
571                return (
572                    StatusCode::BAD_REQUEST,
573                    json!({
574                        "error": "Invalid JSON",
575                        "message": format!("{}", e)
576                    })
577                    .to_string(),
578                )
579                    .into_response();
580            }
581        }
582    };
583
584    // Look up callback
585    let callback = match state.drt().engine_routes().get(&path) {
586        Some(cb) => cb,
587        None => {
588            tracing::debug!("Route /engine/{} not found", path);
589            return (
590                StatusCode::NOT_FOUND,
591                json!({
592                    "error": "Route not found",
593                    "message": format!("Route /engine/{} not found", path)
594                })
595                .to_string(),
596            )
597                .into_response();
598        }
599    };
600
601    // Call callback (it's async, so await it)
602    match callback(body_json).await {
603        Ok(response) => {
604            tracing::trace!("Engine route handler succeeded for /engine/{}", path);
605            (StatusCode::OK, response.to_string()).into_response()
606        }
607        Err(e) => {
608            tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
609            (
610                StatusCode::INTERNAL_SERVER_ERROR,
611                json!({
612                    "error": "Handler error",
613                    "message": format!("{}", e)
614                })
615                .to_string(),
616            )
617                .into_response()
618        }
619    }
620}
621
622// Regular tests: cargo test system_status_server --lib
623#[cfg(test)]
624mod tests {
625    use super::*;
626    use tokio::time::Duration;
627
628    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
629    #[tokio::test]
630    async fn test_http_server_lifecycle() {
631        let cancel_token = CancellationToken::new();
632        let cancel_token_for_server = cancel_token.clone();
633
634        // Test basic HTTP server lifecycle without DistributedRuntime
635        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
636
637        // start HTTP server
638        let server_handle = tokio::spawn(async move {
639            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
640            let _ = axum::serve(listener, app)
641                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
642                .await;
643        });
644
645        // server starts immediately, no need to wait
646
647        // cancel token
648        cancel_token.cancel();
649
650        // wait for the server to shut down
651        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
652        assert!(
653            result.is_ok(),
654            "HTTP server should shut down when cancel token is cancelled"
655        );
656    }
657}
658
659// Integration tests: cargo test system_status_server --lib --features integration
660#[cfg(all(test, feature = "integration"))]
661mod integration_tests {
662    use super::*;
663    use crate::config::environment_names::logging as env_logging;
664    use crate::config::environment_names::runtime::canary as env_canary;
665    use crate::distributed::distributed_test_utils::create_test_drt_async;
666    use crate::metrics::MetricsHierarchy;
667    use anyhow::Result;
668    use rstest::rstest;
669    use std::sync::Arc;
670    use tokio::time::Duration;
671
672    #[tokio::test]
673    async fn test_uptime_from_system_health() {
674        // Test that uptime is available from SystemHealth
675        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
676            let drt = create_test_drt_async().await;
677
678            // Get uptime from SystemHealth
679            let uptime = drt.system_health().lock().uptime();
680            // Uptime should exist (even if close to zero)
681            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
682
683            // Sleep briefly and check uptime increases
684            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
685            let uptime_after = drt.system_health().lock().uptime();
686            assert!(uptime_after > uptime);
687        })
688        .await;
689    }
690
691    #[tokio::test]
692    async fn test_runtime_metrics_initialization_and_namespace() {
693        // Test that metrics have correct namespace
694        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
695            let drt = create_test_drt_async().await;
696            // SystemStatusState is already created in distributed.rs
697            // so we don't need to create it again here
698
699            // The uptime_seconds metric should already be registered and available
700            let response = drt.metrics().prometheus_expfmt().unwrap();
701            println!("Full metrics response:\n{}", response);
702
703            // Check that uptime_seconds metric is present with correct namespace
704            assert!(
705                response.contains("# HELP dynamo_component_uptime_seconds"),
706                "Should contain uptime_seconds help text"
707            );
708            assert!(
709                response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
710                "Should contain uptime_seconds type"
711            );
712            assert!(
713                response.contains("dynamo_component_uptime_seconds"),
714                "Should contain uptime_seconds metric with correct namespace"
715            );
716        })
717        .await;
718    }
719
720    #[tokio::test]
721    async fn test_uptime_gauge_updates() {
722        // Test that the uptime gauge is properly updated and increases over time
723        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
724            let drt = create_test_drt_async().await;
725
726            // Get initial uptime
727            let initial_uptime = drt.system_health().lock().uptime();
728
729            // Update the gauge with initial value
730            drt.system_health().lock().update_uptime_gauge();
731
732            // Sleep for 100ms
733            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
734
735            // Get uptime after sleep
736            let uptime_after_sleep = drt.system_health().lock().uptime();
737
738            // Update the gauge again
739            drt.system_health().lock().update_uptime_gauge();
740
741            // Verify uptime increased by at least 100ms
742            let elapsed = uptime_after_sleep - initial_uptime;
743            assert!(
744                elapsed >= std::time::Duration::from_millis(100),
745                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
746                elapsed
747            );
748        })
749        .await;
750    }
751
752    #[tokio::test]
753    async fn test_http_requests_fail_when_system_disabled() {
754        // Test that system status server is not running when disabled
755        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
756            let drt = create_test_drt_async().await;
757
758            // Verify that system status server info is None when disabled
759            let system_info = drt.system_status_server_info();
760            assert!(
761                system_info.is_none(),
762                "System status server should not be running when disabled"
763            );
764
765            println!("✓ System status server correctly disabled when not enabled");
766        })
767        .await;
768    }
769
770    /// This test verifies the health and liveness endpoints of the system status server.
771    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
772    /// based on the initial health status and any custom endpoint paths provided via environment variables.
773    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
774    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
775    /// and expected response codes and bodies.
776    #[rstest]
777    #[case("ready", 200, "ready", None, None, 3)]
778    #[case("notready", 503, "notready", None, None, 3)]
779    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
780    #[case(
781        "notready",
782        503,
783        "notready",
784        Some("/custom/health"),
785        Some("/custom/live"),
786        5
787    )]
788    #[tokio::test]
789    #[cfg(feature = "integration")]
790    async fn test_health_endpoints(
791        #[case] starting_health_status: &'static str,
792        #[case] expected_status: u16,
793        #[case] expected_body: &'static str,
794        #[case] custom_health_path: Option<&'static str>,
795        #[case] custom_live_path: Option<&'static str>,
796        #[case] expected_num_tests: usize,
797    ) {
798        use std::sync::Arc;
799        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
800        // use reqwest for HTTP requests
801
802        // Closure call is needed here to satisfy async_with_vars
803
804        crate::logging::init();
805
806        #[allow(clippy::redundant_closure_call)]
807        temp_env::async_with_vars(
808            [
809                (env_system::DYN_SYSTEM_PORT, Some("0")),
810                (
811                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
812                    Some(starting_health_status),
813                ),
814                (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
815                (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
816            ],
817            (async || {
818                let drt = Arc::new(create_test_drt_async().await);
819
820                // Get system status server info from DRT (instead of manually spawning)
821                let system_info = drt
822                    .system_status_server_info()
823                    .expect("System status server should be started by DRT");
824                let addr = system_info.socket_addr;
825
826                let client = reqwest::Client::new();
827
828                // Prepare test cases
829                let mut test_cases = vec![];
830                match custom_health_path {
831                    None => {
832                        // When using default paths, test the default paths
833                        test_cases.push(("/health", expected_status, expected_body));
834                    }
835                    Some(chp) => {
836                        // When using custom paths, default paths should not exist
837                        test_cases.push(("/health", 404, "Route not found"));
838                        test_cases.push((chp, expected_status, expected_body));
839                    }
840                }
841                match custom_live_path {
842                    None => {
843                        // When using default paths, test the default paths
844                        test_cases.push(("/live", expected_status, expected_body));
845                    }
846                    Some(clp) => {
847                        // When using custom paths, default paths should not exist
848                        test_cases.push(("/live", 404, "Route not found"));
849                        test_cases.push((clp, expected_status, expected_body));
850                    }
851                }
852                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
853                assert_eq!(test_cases.len(), expected_num_tests);
854
855                for (path, expect_status, expect_body) in test_cases {
856                    println!("[test] Sending request to {}", path);
857                    let url = format!("http://{}{}", addr, path);
858                    let response = client.get(&url).send().await.unwrap();
859                    let status = response.status();
860                    let body = response.text().await.unwrap();
861                    println!(
862                        "[test] Response for {}: status={}, body={:?}",
863                        path, status, body
864                    );
865                    assert_eq!(
866                        status, expect_status,
867                        "Response: status={}, body={:?}",
868                        status, body
869                    );
870                    assert!(
871                        body.contains(expect_body),
872                        "Response: status={}, body={:?}",
873                        status,
874                        body
875                    );
876                }
877            })(),
878        )
879        .await;
880    }
881
882    #[tokio::test]
883    async fn test_health_endpoint_tracing() -> Result<()> {
884        use std::sync::Arc;
885
886        // Closure call is needed here to satisfy async_with_vars
887
888        #[allow(clippy::redundant_closure_call)]
889        let _ = temp_env::async_with_vars(
890            [
891                (env_system::DYN_SYSTEM_PORT, Some("0")),
892                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
893                (env_logging::DYN_LOGGING_JSONL, Some("1")),
894                (env_logging::DYN_LOG, Some("trace")),
895            ],
896            (async || {
897                // TODO Add proper testing for
898                // trace id and parent id
899
900                crate::logging::init();
901
902                let drt = Arc::new(create_test_drt_async().await);
903
904                // Get system status server info from DRT (instead of manually spawning)
905                let system_info = drt
906                    .system_status_server_info()
907                    .expect("System status server should be started by DRT");
908                let addr = system_info.socket_addr;
909                let client = reqwest::Client::new();
910                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
911                    let traceparent_value =
912                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
913                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
914                    let mut headers = reqwest::header::HeaderMap::new();
915                    headers.insert(
916                        reqwest::header::HeaderName::from_static("traceparent"),
917                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
918                    );
919                    headers.insert(
920                        reqwest::header::HeaderName::from_static("tracestate"),
921                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
922                    );
923                    let url = format!("http://{}{}", addr, path);
924                    let response = client.get(&url).headers(headers).send().await.unwrap();
925                    let status = response.status();
926                    let body = response.text().await.unwrap();
927                    tracing::info!(body = body, status = status.to_string());
928                }
929
930                Ok::<(), anyhow::Error>(())
931            })(),
932        )
933        .await;
934        Ok(())
935    }
936
937    #[tokio::test]
938    async fn test_health_endpoint_with_changing_health_status() {
939        // Test health endpoint starts in not ready status, then becomes ready
940        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
941        const ENDPOINT_NAME: &str = "generate";
942        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
943        temp_env::async_with_vars(
944            [
945                (env_system::DYN_SYSTEM_PORT, Some("0")),
946                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
947                (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
948            ],
949            async {
950                let drt = Arc::new(create_test_drt_async().await);
951
952                // Check if system status server was started
953                let system_info_opt = drt.system_status_server_info();
954
955                // Ensure system status server was spawned by DRT
956                assert!(
957                    system_info_opt.is_some(),
958                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
959                    std::env::var(env_system::DYN_SYSTEM_PORT)
960                );
961
962                // Get the system status server info from DRT - this should never fail now due to above check
963                let system_info = system_info_opt.unwrap();
964                let addr = system_info.socket_addr;
965
966                // Initially check health - should be not ready
967                let client = reqwest::Client::new();
968                let health_url = format!("http://{}/health", addr);
969
970                let response = client.get(&health_url).send().await.unwrap();
971                let status = response.status();
972                let body = response.text().await.unwrap();
973
974                // Health should be not ready (503) initially
975                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
976                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
977
978                // Now create a namespace, component, and endpoint to make the system healthy
979                let namespace = drt.namespace("ns1234").unwrap();
980                let component = namespace.component("comp1234").unwrap();
981
982                // Create a simple test handler
983                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
984                use crate::protocols::annotated::Annotated;
985
986                struct TestHandler;
987
988                #[async_trait]
989                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
990                    async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
991                        let (data, ctx) = input.into_parts();
992                        let response = Annotated::from_data(format!("You responded: {}", data));
993                        Ok(crate::pipeline::ResponseStream::new(
994                            Box::pin(crate::stream::iter(vec![response])),
995                            ctx.context()
996                        ))
997                    }
998                }
999
1000                // Create the ingress and start the endpoint service
1001                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
1002
1003                // Start the service and endpoint with a health check payload
1004                // This will automatically register the endpoint for health monitoring
1005                tokio::spawn(async move {
1006                    let _ = component.endpoint(ENDPOINT_NAME)
1007                        .endpoint_builder()
1008                        .handler(ingress)
1009                        .health_check_payload(serde_json::json!({
1010                            "test": "health_check"
1011                        }))
1012                        .start()
1013                        .await;
1014                });
1015
1016                // Hit health endpoint 200 times to verify consistency
1017                let mut success_count = 0;
1018                let mut failures = Vec::new();
1019
1020                for i in 1..=200 {
1021                    let response = client.get(&health_url).send().await.unwrap();
1022                    let status = response.status();
1023                    let body = response.text().await.unwrap();
1024
1025                    if status == 200 && body.contains("\"status\":\"ready\"") {
1026                        success_count += 1;
1027                    } else {
1028                        failures.push((i, status.as_u16(), body.clone()));
1029                        if failures.len() <= 5 {  // Only log first 5 failures
1030                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
1031                        }
1032                    }
1033                }
1034
1035                tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1036                if !failures.is_empty() {
1037                    tracing::warn!("Failed requests: {}", failures.len());
1038                }
1039
1040                // Expect at least 150 out of 200 requests to be successful
1041                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1042            },
1043        )
1044        .await;
1045    }
1046
1047    #[tokio::test]
1048    async fn test_spawn_system_status_server_endpoints() {
1049        // use reqwest for HTTP requests
1050        temp_env::async_with_vars(
1051            [
1052                (env_system::DYN_SYSTEM_PORT, Some("0")),
1053                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1054            ],
1055            async {
1056                let drt = Arc::new(create_test_drt_async().await);
1057
1058                // Get system status server info from DRT (instead of manually spawning)
1059                let system_info = drt
1060                    .system_status_server_info()
1061                    .expect("System status server should be started by DRT");
1062                let addr = system_info.socket_addr;
1063                let client = reqwest::Client::new();
1064                for (path, expect_200, expect_body) in [
1065                    ("/health", true, "ready"),
1066                    ("/live", true, "ready"),
1067                    ("/someRandomPathNotFoundHere", false, "Route not found"),
1068                ] {
1069                    println!("[test] Sending request to {}", path);
1070                    let url = format!("http://{}{}", addr, path);
1071                    let response = client.get(&url).send().await.unwrap();
1072                    let status = response.status();
1073                    let body = response.text().await.unwrap();
1074                    println!(
1075                        "[test] Response for {}: status={}, body={:?}",
1076                        path, status, body
1077                    );
1078                    if expect_200 {
1079                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1080                    } else {
1081                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1082                    }
1083                    assert!(
1084                        body.contains(expect_body),
1085                        "Response: status={}, body={:?}",
1086                        status,
1087                        body
1088                    );
1089                }
1090                // DRT handles server cleanup automatically
1091            },
1092        )
1093        .await;
1094    }
1095
1096    #[cfg(feature = "integration")]
1097    #[tokio::test]
1098    async fn test_health_check_with_payload_and_timeout() {
1099        // Test the complete health check flow with the new canary-based system:
1100        crate::logging::init();
1101
1102        temp_env::async_with_vars(
1103            [
1104                (env_system::DYN_SYSTEM_PORT, Some("0")),
1105                (
1106                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1107                    Some("notready"),
1108                ),
1109                (
1110                    env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1111                    Some("[\"test.endpoint\"]"),
1112                ),
1113                // Enable health check with short intervals for testing
1114                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1115                (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), // Send canary after 1 second of inactivity
1116                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
1117                ("RUST_LOG", Some("info")),                      // Enable logging for test
1118            ],
1119            async {
1120                let drt = Arc::new(create_test_drt_async().await);
1121
1122                // Get system status server info
1123                let system_info = drt
1124                    .system_status_server_info()
1125                    .expect("System status server should be started");
1126                let addr = system_info.socket_addr;
1127
1128                let client = reqwest::Client::new();
1129                let health_url = format!("http://{}/health", addr);
1130
1131                // Register an endpoint with health check payload
1132                let endpoint = "test.endpoint";
1133                let health_check_payload = serde_json::json!({
1134                    "prompt": "health check test",
1135                    "_health_check": true
1136                });
1137
1138                // Register the endpoint and its health check payload
1139                {
1140                    let system_health = drt.system_health();
1141                    let system_health_lock = system_health.lock();
1142                    system_health_lock.register_health_check_target(
1143                        endpoint,
1144                        crate::component::Instance {
1145                            component: "test_component".to_string(),
1146                            endpoint: "health".to_string(),
1147                            namespace: "test_namespace".to_string(),
1148                            instance_id: 1,
1149                            transport: crate::component::TransportType::Nats(endpoint.to_string()),
1150                        },
1151                        health_check_payload.clone(),
1152                    );
1153                }
1154
1155                // Check initial health - should be ready (default state)
1156                let response = client.get(&health_url).send().await.unwrap();
1157                let status = response.status();
1158                let body = response.text().await.unwrap();
1159                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1160                assert!(
1161                    body.contains("\"status\":\"notready\""),
1162                    "Should show notready status initially"
1163                );
1164
1165                // Set endpoint to healthy state
1166                drt.system_health()
1167                    .lock()
1168                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1169
1170                // Check health again - should now be healthy
1171                let response = client.get(&health_url).send().await.unwrap();
1172                let status = response.status();
1173                let body = response.text().await.unwrap();
1174
1175                assert_eq!(status, 200, "Should be healthy due to recent response");
1176                assert!(
1177                    body.contains("\"status\":\"ready\""),
1178                    "Should show ready status after response"
1179                );
1180
1181                // Verify the endpoint status in SystemHealth directly
1182                let endpoint_status = drt
1183                    .system_health()
1184                    .lock()
1185                    .get_endpoint_health_status(endpoint);
1186                assert_eq!(
1187                    endpoint_status,
1188                    Some(HealthStatus::Ready),
1189                    "SystemHealth should show endpoint as Ready after response"
1190                );
1191            },
1192        )
1193        .await;
1194    }
1195}