Skip to main content

dynamo_runtime/
system_status_server.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4// TODO: (DEP-635) this file should be renamed to system_http_server.rs
5//  it is being used not just for status, health, but others like loras management.
6
7use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_system_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15    Router,
16    body::Bytes,
17    extract::{Json, Path, State},
18    http::StatusCode,
19    response::IntoResponse,
20    routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32/// System status server information containing socket address and handle
33#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35    pub socket_addr: std::net::SocketAddr,
36    pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40    pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41        Self {
42            socket_addr,
43            handle: handle.map(Arc::new),
44        }
45    }
46
47    pub fn address(&self) -> String {
48        self.socket_addr.to_string()
49    }
50
51    pub fn hostname(&self) -> String {
52        self.socket_addr.ip().to_string()
53    }
54
55    pub fn port(&self) -> u16 {
56        self.socket_addr.port()
57    }
58}
59
60impl Clone for SystemStatusServerInfo {
61    fn clone(&self) -> Self {
62        Self {
63            socket_addr: self.socket_addr,
64            handle: self.handle.clone(),
65        }
66    }
67}
68
69/// System status server state containing the distributed runtime reference
70pub struct SystemStatusState {
71    // global drt registry is for printing out the entire Prometheus format output
72    root_drt: Arc<crate::DistributedRuntime>,
73    // Discovery metadata (only for Kubernetes backend)
74    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78    /// Create new system status server state with the provided distributed runtime
79    pub fn new(
80        drt: Arc<crate::DistributedRuntime>,
81        discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82    ) -> anyhow::Result<Self> {
83        Ok(Self {
84            root_drt: drt,
85            discovery_metadata,
86        })
87    }
88
89    /// Get a reference to the distributed runtime
90    pub fn drt(&self) -> &crate::DistributedRuntime {
91        &self.root_drt
92    }
93
94    /// Get a reference to the discovery metadata if available
95    pub fn discovery_metadata(
96        &self,
97    ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98        self.discovery_metadata.as_ref()
99    }
100}
101
102/// Request body for POST /v1/loras
103#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105    pub lora_name: String,
106    pub source: LoraSource,
107}
108
109/// Source information for loading a LoRA
110#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112    pub uri: String,
113}
114
115/// Response body for LoRA operations
116#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118    pub status: String,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub message: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    pub lora_name: Option<String>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub lora_id: Option<u64>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub loras: Option<serde_json::Value>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub count: Option<usize>,
129}
130
131/// Start system status server with metrics support
132pub async fn spawn_system_status_server(
133    host: &str,
134    port: u16,
135    cancel_token: CancellationToken,
136    drt: Arc<crate::DistributedRuntime>,
137    discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139    // Create system status server state with the provided distributed runtime
140    let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141    let health_path = server_state
142        .drt()
143        .system_health()
144        .lock()
145        .health_path()
146        .to_string();
147    let live_path = server_state
148        .drt()
149        .system_health()
150        .lock()
151        .live_path()
152        .to_string();
153
154    // Check if LoRA feature is enabled
155    let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156        .map(|v| v.to_lowercase() == "true")
157        .unwrap_or(false);
158
159    let mut app = Router::new()
160        .route(
161            &health_path,
162            get({
163                let state = Arc::clone(&server_state);
164                move || health_handler(state)
165            }),
166        )
167        .route(
168            &live_path,
169            get({
170                let state = Arc::clone(&server_state);
171                move || health_handler(state)
172            }),
173        )
174        .route(
175            "/metrics",
176            get({
177                let state = Arc::clone(&server_state);
178                move || metrics_handler(state)
179            }),
180        )
181        .route(
182            "/metadata",
183            get({
184                let state = Arc::clone(&server_state);
185                move || metadata_handler(state)
186            }),
187        )
188        .route(
189            "/engine/{*path}",
190            any({
191                let state = Arc::clone(&server_state);
192                move |path, body| engine_route_handler(state, path, body)
193            }),
194        );
195
196    // Add LoRA routes only if DYN_LORA_ENABLED is set to true
197    if lora_enabled {
198        app = app
199            .route(
200                "/v1/loras",
201                get({
202                    let state = Arc::clone(&server_state);
203                    move || list_loras_handler(State(state))
204                })
205                .post({
206                    let state = Arc::clone(&server_state);
207                    move |body| load_lora_handler(State(state), body)
208                }),
209            )
210            .route(
211                "/v1/loras/{*lora_name}",
212                delete({
213                    let state = Arc::clone(&server_state);
214                    move |path| unload_lora_handler(State(state), path)
215                }),
216            );
217    }
218
219    // Self-hosted MDC files. Always mounted; empty registry → 404.
220    // Suffix segment scopes per-registration (LoRA slug, or `_base`)
221    // so detaching one registration doesn't wipe another's entries.
222    app = app.route(
223        "/v1/metadata/{model_slug}/{model_suffix}/{*filename}",
224        get({
225            let state = Arc::clone(&server_state);
226            move |path| metadata_file_handler(State(state), path)
227        }),
228    );
229
230    let app = app
231        .fallback(|| async {
232            tracing::info!("[fallback handler] called");
233            (StatusCode::NOT_FOUND, "Route not found").into_response()
234        })
235        .layer(TraceLayer::new_for_http().make_span_with(make_system_request_span));
236
237    let address = format!("{}:{}", host, port);
238    tracing::info!("[spawn_system_status_server] binding to: {address}");
239
240    let listener = match TcpListener::bind(&address).await {
241        Ok(listener) => {
242            // get the actual address and port, print in debug level
243            let actual_address = listener.local_addr()?;
244            tracing::info!(
245                "[spawn_system_status_server] system status server bound to: {}",
246                actual_address
247            );
248            (listener, actual_address)
249        }
250        Err(e) => {
251            tracing::error!("Failed to bind to address {}: {}", address, e);
252            return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
253        }
254    };
255    let (listener, actual_address) = listener;
256
257    let observer = cancel_token.child_token();
258    // Spawn the server in the background and return the handle
259    let handle = tokio::spawn(async move {
260        if let Err(e) = axum::serve(listener, app)
261            .with_graceful_shutdown(observer.cancelled_owned())
262            .await
263        {
264            tracing::error!("System status server error: {e}");
265        }
266    });
267
268    Ok((actual_address, handle))
269}
270
271/// Health handler with optional active health checking
272#[tracing::instrument(skip_all, level = "trace")]
273async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
274    // Get basic health status
275    let system_health = state.drt().system_health();
276    let system_health_lock = system_health.lock();
277    let (healthy, endpoints) = system_health_lock.get_health_status();
278    let uptime = Some(system_health_lock.uptime());
279    drop(system_health_lock);
280
281    let healthy_string = if healthy { "ready" } else { "notready" };
282    let status_code = if healthy {
283        StatusCode::OK
284    } else {
285        StatusCode::SERVICE_UNAVAILABLE
286    };
287
288    let response = json!({
289        "status": healthy_string,
290        "uptime": uptime,
291        "endpoints": endpoints,
292    });
293
294    tracing::trace!("Response {}", response.to_string());
295
296    (status_code, response.to_string())
297}
298
299/// Metrics handler with DistributedRuntime uptime
300#[tracing::instrument(skip_all, level = "trace")]
301async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
302    // Get all metrics from the DistributedRuntime.
303    // The uptime gauge is updated automatically via a PrometheusUpdateCallback
304    // registered in DistributedRuntime::new(), so it is always fresh before scrape.
305    //
306    // NOTE: We use a multi-registry model (e.g. one registry per endpoint) and merge at scrape time,
307    // so /metrics traverses registered child registries and produces a single combined output.
308    let response = match state.drt().metrics().prometheus_expfmt() {
309        Ok(r) => r,
310        Err(e) => {
311            tracing::error!("Failed to get metrics from registry: {e}");
312            return (
313                StatusCode::INTERNAL_SERVER_ERROR,
314                "Failed to get metrics".to_string(),
315            );
316        }
317    };
318
319    (StatusCode::OK, response)
320}
321
322/// Metadata handler
323#[tracing::instrument(skip_all, level = "trace")]
324async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
325    // Check if discovery metadata is available
326    let metadata = match state.discovery_metadata() {
327        Some(metadata) => metadata,
328        None => {
329            tracing::debug!("Metadata endpoint called but no discovery metadata available");
330            return (
331                StatusCode::NOT_FOUND,
332                "Discovery metadata not available".to_string(),
333            )
334                .into_response();
335        }
336    };
337
338    // Read the metadata
339    let metadata_guard = metadata.read().await;
340
341    // Serialize to JSON
342    match serde_json::to_string(&*metadata_guard) {
343        Ok(json) => {
344            tracing::trace!("Returning metadata: {} bytes", json.len());
345            (StatusCode::OK, json).into_response()
346        }
347        Err(e) => {
348            tracing::error!("Failed to serialize metadata: {e}");
349            (
350                StatusCode::INTERNAL_SERVER_ERROR,
351                "Failed to serialize metadata".to_string(),
352            )
353                .into_response()
354        }
355    }
356}
357
358/// Handler for POST /v1/loras - Load a LoRA adapter
359#[tracing::instrument(skip_all, level = "debug")]
360async fn load_lora_handler(
361    State(state): State<Arc<SystemStatusState>>,
362    Json(request): Json<LoadLoraRequest>,
363) -> impl IntoResponse {
364    tracing::info!("Loading LoRA: {}", request.lora_name);
365
366    // Call the load_lora endpoint for each available backend
367    match call_lora_endpoint(
368        state.drt(),
369        "load_lora",
370        json!({
371            "lora_name": request.lora_name,
372            "source": {
373                "uri": request.source.uri
374            },
375        }),
376    )
377    .await
378    {
379        Ok(response) => {
380            if response.status == "error" {
381                tracing::error!(
382                    "Failed to load LoRA {}: {}",
383                    request.lora_name,
384                    response.message.as_deref().unwrap_or("Unknown error")
385                );
386                (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
387            } else {
388                tracing::info!("LoRA loaded successfully: {}", request.lora_name);
389                (StatusCode::OK, Json(response))
390            }
391        }
392        Err(e) => {
393            tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
394            (
395                StatusCode::INTERNAL_SERVER_ERROR,
396                Json(LoraResponse {
397                    status: "error".to_string(),
398                    message: Some(e.to_string()),
399                    lora_name: Some(request.lora_name),
400                    lora_id: None,
401                    loras: None,
402                    count: None,
403                }),
404            )
405        }
406    }
407}
408
409/// Handler for DELETE /v1/loras/*lora_name - Unload a LoRA adapter
410#[tracing::instrument(skip_all, level = "debug")]
411async fn unload_lora_handler(
412    State(state): State<Arc<SystemStatusState>>,
413    Path(lora_name): Path<String>,
414) -> impl IntoResponse {
415    // Strip the leading slash from the wildcard capture
416    let lora_name = lora_name
417        .strip_prefix('/')
418        .unwrap_or(&lora_name)
419        .to_string();
420    tracing::info!("Unloading LoRA: {lora_name}");
421
422    // Call the unload_lora endpoint for each available backend
423    match call_lora_endpoint(
424        state.drt(),
425        "unload_lora",
426        json!({
427            "lora_name": lora_name.clone(),
428        }),
429    )
430    .await
431    {
432        Ok(response) => {
433            if response.status == "error" {
434                tracing::error!(
435                    "Failed to unload LoRA {}: {}",
436                    lora_name,
437                    response.message.as_deref().unwrap_or("Unknown error")
438                );
439                (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
440            } else {
441                tracing::info!("LoRA unloaded successfully: {lora_name}");
442                (StatusCode::OK, Json(response))
443            }
444        }
445        Err(e) => {
446            tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
447            (
448                StatusCode::INTERNAL_SERVER_ERROR,
449                Json(LoraResponse {
450                    status: "error".to_string(),
451                    message: Some(e.to_string()),
452                    lora_name: Some(lora_name),
453                    lora_id: None,
454                    loras: None,
455                    count: None,
456                }),
457            )
458        }
459    }
460}
461
462/// Handler for GET /v1/loras - List all LoRA adapters
463#[tracing::instrument(skip_all, level = "debug")]
464async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
465    tracing::info!("Listing all LoRAs");
466
467    // Call the list_loras endpoint for each available backend
468    match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
469        Ok(response) => {
470            tracing::info!("Successfully retrieved LoRA list");
471            (StatusCode::OK, Json(response))
472        }
473        Err(e) => {
474            tracing::error!("Failed to list LoRAs: {e}");
475            (
476                StatusCode::INTERNAL_SERVER_ERROR,
477                Json(LoraResponse {
478                    status: "error".to_string(),
479                    message: Some(e.to_string()),
480                    lora_name: None,
481                    lora_id: None,
482                    loras: None,
483                    count: None,
484                }),
485            )
486        }
487    }
488}
489
490/// `GET /v1/metadata/{slug}/{suffix}/{filename}` — 404 on miss,
491/// 500 on read error, raw bytes on hit. Consumer blake3-verifies.
492async fn metadata_file_handler(
493    State(state): State<Arc<SystemStatusState>>,
494    Path((model_slug, model_suffix, filename)): Path<(String, String, String)>,
495) -> impl IntoResponse {
496    let path = match state
497        .drt()
498        .metadata_artifacts()
499        .get(&model_slug, &model_suffix, &filename)
500    {
501        Some(p) => p,
502        None => {
503            tracing::debug!(
504                model_slug,
505                model_suffix,
506                filename,
507                "metadata artifact not registered for self-host"
508            );
509            return (StatusCode::NOT_FOUND, "Not found").into_response();
510        }
511    };
512
513    match tokio::fs::read(&path).await {
514        Ok(bytes) => (StatusCode::OK, bytes).into_response(),
515        Err(err) => {
516            tracing::error!(
517                model_slug,
518                model_suffix,
519                filename,
520                path = %path.display(),
521                %err,
522                "failed to read self-hosted metadata file"
523            );
524            (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response()
525        }
526    }
527}
528
529/// Helper function to call a LoRA management endpoint locally via in-process registry
530///
531/// This function ONLY uses the local endpoint registry for direct in-process calls.
532/// It does NOT fall back to network discovery if the endpoint is not found.
533async fn call_lora_endpoint(
534    drt: &crate::DistributedRuntime,
535    endpoint_name: &str,
536    request_body: serde_json::Value,
537) -> anyhow::Result<LoraResponse> {
538    use crate::engine::AsyncEngine;
539
540    tracing::debug!("Calling local endpoint: '{endpoint_name}'");
541
542    // Get the endpoint from the local registry (in-process call only)
543    let local_registry = drt.local_endpoint_registry();
544    let engine = local_registry
545        .get(endpoint_name)
546        .ok_or_else(|| {
547            anyhow::anyhow!(
548                "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
549                endpoint_name
550            )
551        })?;
552
553    tracing::debug!(
554        "Found endpoint '{}' in local registry, calling directly",
555        endpoint_name
556    );
557
558    // Call the engine directly without going through the network stack
559    let request = crate::pipeline::SingleIn::new(request_body);
560    let mut stream = engine.generate(request).await?;
561
562    // Get the first response
563    if let Some(response) = stream.next().await {
564        let response_data = response.data.unwrap_or_default();
565
566        // Try structured deserialization first, fall back to manual field extraction
567        let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
568            .unwrap_or_else(|_| parse_lora_response(&response_data));
569
570        return Ok(lora_response);
571    }
572
573    anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
574}
575
576/// Helper to parse response data into LoraResponse
577fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
578    LoraResponse {
579        status: response_data
580            .get("status")
581            .and_then(|s| s.as_str())
582            .unwrap_or("success")
583            .to_string(),
584        message: response_data
585            .get("message")
586            .and_then(|m| m.as_str())
587            .map(|s| s.to_string()),
588        lora_name: response_data
589            .get("lora_name")
590            .and_then(|n| n.as_str())
591            .map(|s| s.to_string()),
592        lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
593        loras: response_data.get("loras").cloned(),
594        count: response_data
595            .get("count")
596            .and_then(|c| c.as_u64())
597            .map(|c| c as usize),
598    }
599}
600
601/// Engine route handler for /engine/* routes
602///
603/// This handler looks up registered callbacks in the engine routes registry
604/// and invokes them with the request body, returning the response as JSON.
605#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
606async fn engine_route_handler(
607    state: Arc<SystemStatusState>,
608    Path(path): Path<String>,
609    body: Bytes,
610) -> impl IntoResponse {
611    tracing::trace!("Engine route request to /engine/{path}");
612
613    // Parse body as JSON (empty object for GET/empty body)
614    let body_json: serde_json::Value = if body.is_empty() {
615        serde_json::json!({})
616    } else {
617        match serde_json::from_slice(&body) {
618            Ok(json) => json,
619            Err(e) => {
620                tracing::warn!("Invalid JSON in request body: {e}");
621                return (
622                    StatusCode::BAD_REQUEST,
623                    json!({
624                        "error": "Invalid JSON",
625                        "message": format!("{}", e)
626                    })
627                    .to_string(),
628                )
629                    .into_response();
630            }
631        }
632    };
633
634    // Look up callback
635    let callback = match state.drt().engine_routes().get(&path) {
636        Some(cb) => cb,
637        None => {
638            tracing::debug!("Route /engine/{path} not found");
639            return (
640                StatusCode::NOT_FOUND,
641                json!({
642                    "error": "Route not found",
643                    "message": format!("Route /engine/{} not found", path)
644                })
645                .to_string(),
646            )
647                .into_response();
648        }
649    };
650
651    // Call callback (it's async, so await it)
652    match callback(body_json).await {
653        Ok(response) => {
654            tracing::trace!("Engine route handler succeeded for /engine/{path}");
655            (StatusCode::OK, response.to_string()).into_response()
656        }
657        Err(e) => {
658            tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
659            (
660                StatusCode::INTERNAL_SERVER_ERROR,
661                json!({
662                    "error": "Handler error",
663                    "message": format!("{}", e)
664                })
665                .to_string(),
666            )
667                .into_response()
668        }
669    }
670}
671
672// Regular tests: cargo test system_status_server --lib
673#[cfg(test)]
674mod tests {
675    use super::*;
676    use tokio::time::Duration;
677
678    // This is a basic test to verify the HTTP server is working before testing other more complicated tests
679    #[tokio::test]
680    async fn test_http_server_lifecycle() {
681        let cancel_token = CancellationToken::new();
682        let cancel_token_for_server = cancel_token.clone();
683
684        // Test basic HTTP server lifecycle without DistributedRuntime
685        let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
686
687        // start HTTP server
688        let server_handle = tokio::spawn(async move {
689            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
690            let _ = axum::serve(listener, app)
691                .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
692                .await;
693        });
694
695        // server starts immediately, no need to wait
696
697        // cancel token
698        cancel_token.cancel();
699
700        // wait for the server to shut down
701        let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
702        assert!(
703            result.is_ok(),
704            "HTTP server should shut down when cancel token is cancelled"
705        );
706    }
707}
708
709// Integration tests: cargo test system_status_server --lib --features integration
710#[cfg(all(test, feature = "integration"))]
711mod integration_tests {
712    use super::*;
713    use crate::config::environment_names::logging as env_logging;
714    use crate::config::environment_names::runtime::canary as env_canary;
715    use crate::distributed::distributed_test_utils::create_test_drt_async;
716    use crate::metrics::MetricsHierarchy;
717    use anyhow::Result;
718    use rstest::rstest;
719    use std::sync::Arc;
720    use tokio::time::Duration;
721
722    #[tokio::test]
723    async fn test_uptime_from_system_health() {
724        // Test that uptime is available from SystemHealth
725        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
726            let drt = create_test_drt_async().await;
727
728            // Get uptime from SystemHealth
729            let uptime = drt.system_health().lock().uptime();
730            // Uptime should exist (even if close to zero)
731            assert!(uptime.as_nanos() > 0 || uptime.is_zero());
732
733            // Sleep briefly and check uptime increases
734            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
735            let uptime_after = drt.system_health().lock().uptime();
736            assert!(uptime_after > uptime);
737        })
738        .await;
739    }
740
741    #[tokio::test]
742    async fn test_runtime_metrics_initialization_and_namespace() {
743        // Test that metrics have correct namespace
744        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
745            let drt = create_test_drt_async().await;
746            // SystemStatusState is already created in distributed.rs
747            // so we don't need to create it again here
748
749            // The uptime_seconds metric should already be registered and available
750            let response = drt.metrics().prometheus_expfmt().unwrap();
751            println!("Full metrics response:\n{}", response);
752
753            // Check that uptime_seconds metric is present with correct namespace
754            assert!(
755                response.contains("# HELP dynamo_component_uptime_seconds"),
756                "Should contain uptime_seconds help text"
757            );
758            assert!(
759                response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
760                "Should contain uptime_seconds type"
761            );
762            assert!(
763                response.contains("dynamo_component_uptime_seconds"),
764                "Should contain uptime_seconds metric with correct namespace"
765            );
766        })
767        .await;
768    }
769
770    #[tokio::test]
771    async fn test_uptime_gauge_updates() {
772        // Test that the uptime gauge is properly updated and increases over time
773        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
774            let drt = create_test_drt_async().await;
775
776            // Get initial uptime
777            let initial_uptime = drt.system_health().lock().uptime();
778
779            // Update the gauge with initial value
780            drt.system_health().lock().update_uptime_gauge();
781
782            // Sleep for 100ms
783            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
784
785            // Get uptime after sleep
786            let uptime_after_sleep = drt.system_health().lock().uptime();
787
788            // Update the gauge again
789            drt.system_health().lock().update_uptime_gauge();
790
791            // Verify uptime increased by at least 100ms
792            let elapsed = uptime_after_sleep - initial_uptime;
793            assert!(
794                elapsed >= std::time::Duration::from_millis(100),
795                "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
796                elapsed
797            );
798        })
799        .await;
800    }
801
802    #[tokio::test]
803    async fn test_http_requests_fail_when_system_disabled() {
804        // Test that system status server is not running when disabled
805        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
806            let drt = create_test_drt_async().await;
807
808            // Verify that system status server info is None when disabled
809            let system_info = drt.system_status_server_info();
810            assert!(
811                system_info.is_none(),
812                "System status server should not be running when disabled"
813            );
814
815            println!("✓ System status server correctly disabled when not enabled");
816        })
817        .await;
818    }
819
820    /// This test verifies the health and liveness endpoints of the system status server.
821    /// It checks that the endpoints respond with the correct HTTP status codes and bodies
822    /// based on the initial health status and any custom endpoint paths provided via environment variables.
823    /// The test is parameterized using multiple #[case] attributes to cover various scenarios,
824    /// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
825    /// and expected response codes and bodies.
826    #[rstest]
827    #[case("ready", 200, "ready", None, None, 3)]
828    #[case("notready", 503, "notready", None, None, 3)]
829    #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
830    #[case(
831        "notready",
832        503,
833        "notready",
834        Some("/custom/health"),
835        Some("/custom/live"),
836        5
837    )]
838    #[tokio::test]
839    #[cfg(feature = "integration")]
840    async fn test_health_endpoints(
841        #[case] starting_health_status: &'static str,
842        #[case] expected_status: u16,
843        #[case] expected_body: &'static str,
844        #[case] custom_health_path: Option<&'static str>,
845        #[case] custom_live_path: Option<&'static str>,
846        #[case] expected_num_tests: usize,
847    ) {
848        use std::sync::Arc;
849        // use tokio::io::{AsyncReadExt, AsyncWriteExt};
850        // use reqwest for HTTP requests
851
852        // Closure call is needed here to satisfy async_with_vars
853
854        crate::logging::init();
855
856        #[allow(clippy::redundant_closure_call)]
857        temp_env::async_with_vars(
858            [
859                (env_system::DYN_SYSTEM_PORT, Some("0")),
860                (
861                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
862                    Some(starting_health_status),
863                ),
864                (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
865                (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
866            ],
867            (async || {
868                let drt = Arc::new(create_test_drt_async().await);
869
870                // Get system status server info from DRT (instead of manually spawning)
871                let system_info = drt
872                    .system_status_server_info()
873                    .expect("System status server should be started by DRT");
874                let addr = system_info.socket_addr;
875
876                let client = reqwest::Client::new();
877
878                // Prepare test cases
879                let mut test_cases = vec![];
880                match custom_health_path {
881                    None => {
882                        // When using default paths, test the default paths
883                        test_cases.push(("/health", expected_status, expected_body));
884                    }
885                    Some(chp) => {
886                        // When using custom paths, default paths should not exist
887                        test_cases.push(("/health", 404, "Route not found"));
888                        test_cases.push((chp, expected_status, expected_body));
889                    }
890                }
891                match custom_live_path {
892                    None => {
893                        // When using default paths, test the default paths
894                        test_cases.push(("/live", expected_status, expected_body));
895                    }
896                    Some(clp) => {
897                        // When using custom paths, default paths should not exist
898                        test_cases.push(("/live", 404, "Route not found"));
899                        test_cases.push((clp, expected_status, expected_body));
900                    }
901                }
902                test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
903                assert_eq!(test_cases.len(), expected_num_tests);
904
905                for (path, expect_status, expect_body) in test_cases {
906                    println!("[test] Sending request to {}", path);
907                    let url = format!("http://{}{}", addr, path);
908                    let response = client.get(&url).send().await.unwrap();
909                    let status = response.status();
910                    let body = response.text().await.unwrap();
911                    println!(
912                        "[test] Response for {}: status={}, body={:?}",
913                        path, status, body
914                    );
915                    assert_eq!(
916                        status, expect_status,
917                        "Response: status={}, body={:?}",
918                        status, body
919                    );
920                    assert!(
921                        body.contains(expect_body),
922                        "Response: status={}, body={:?}",
923                        status,
924                        body
925                    );
926                }
927            })(),
928        )
929        .await;
930    }
931
932    #[tokio::test]
933    async fn test_health_endpoint_tracing() -> Result<()> {
934        use std::sync::Arc;
935
936        // Closure call is needed here to satisfy async_with_vars
937
938        #[allow(clippy::redundant_closure_call)]
939        let _ = temp_env::async_with_vars(
940            [
941                (env_system::DYN_SYSTEM_PORT, Some("0")),
942                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
943                (env_logging::DYN_LOGGING_JSONL, Some("1")),
944                (env_logging::DYN_LOG, Some("trace")),
945            ],
946            (async || {
947                // TODO Add proper testing for
948                // trace id and parent id
949
950                crate::logging::init();
951
952                let drt = Arc::new(create_test_drt_async().await);
953
954                // Get system status server info from DRT (instead of manually spawning)
955                let system_info = drt
956                    .system_status_server_info()
957                    .expect("System status server should be started by DRT");
958                let addr = system_info.socket_addr;
959                let client = reqwest::Client::new();
960                for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
961                    let traceparent_value =
962                        "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
963                    let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
964                    let mut headers = reqwest::header::HeaderMap::new();
965                    headers.insert(
966                        reqwest::header::HeaderName::from_static("traceparent"),
967                        reqwest::header::HeaderValue::from_str(traceparent_value)?,
968                    );
969                    headers.insert(
970                        reqwest::header::HeaderName::from_static("tracestate"),
971                        reqwest::header::HeaderValue::from_str(tracestate_value)?,
972                    );
973                    let url = format!("http://{}{}", addr, path);
974                    let response = client.get(&url).headers(headers).send().await.unwrap();
975                    let status = response.status();
976                    let body = response.text().await.unwrap();
977                    tracing::info!(body = body, status = status.to_string());
978                }
979
980                Ok::<(), anyhow::Error>(())
981            })(),
982        )
983        .await;
984        Ok(())
985    }
986
987    #[tokio::test]
988    async fn test_health_endpoint_with_changing_health_status() {
989        // Test health endpoint starts in not ready status, then becomes ready
990        // when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
991        const ENDPOINT_NAME: &str = "generate";
992        const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
993        temp_env::async_with_vars(
994            [
995                (env_system::DYN_SYSTEM_PORT, Some("0")),
996                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
997                (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
998            ],
999            async {
1000                let drt = Arc::new(create_test_drt_async().await);
1001
1002                // Check if system status server was started
1003                let system_info_opt = drt.system_status_server_info();
1004
1005                // Ensure system status server was spawned by DRT
1006                assert!(
1007                    system_info_opt.is_some(),
1008                    "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
1009                    std::env::var(env_system::DYN_SYSTEM_PORT)
1010                );
1011
1012                // Get the system status server info from DRT - this should never fail now due to above check
1013                let system_info = system_info_opt.unwrap();
1014                let addr = system_info.socket_addr;
1015
1016                // Initially check health - should be not ready
1017                let client = reqwest::Client::new();
1018                let health_url = format!("http://{}/health", addr);
1019
1020                let response = client.get(&health_url).send().await.unwrap();
1021                let status = response.status();
1022                let body = response.text().await.unwrap();
1023
1024                // Health should be not ready (503) initially
1025                assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
1026                assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
1027
1028                // Now create a namespace, component, and endpoint to make the system healthy
1029                let namespace = drt.namespace("ns1234").unwrap();
1030                let component = namespace.component("comp1234").unwrap();
1031
1032                // Create a simple test handler
1033                use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
1034                use crate::protocols::annotated::Annotated;
1035
1036                struct TestHandler;
1037
1038                #[async_trait]
1039                impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
1040                    async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
1041                        let (data, ctx) = input.into_parts();
1042                        let response = Annotated::from_data(format!("You responded: {}", data));
1043                        Ok(crate::pipeline::ResponseStream::new(
1044                            Box::pin(crate::stream::iter(vec![response])),
1045                            ctx.context()
1046                        ))
1047                    }
1048                }
1049
1050                // Create the ingress and start the endpoint service
1051                let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
1052
1053                // Start the service and endpoint with a health check payload
1054                // This will automatically register the endpoint for health monitoring
1055                tokio::spawn(async move {
1056                    let _ = component.endpoint(ENDPOINT_NAME)
1057                        .endpoint_builder()
1058                        .handler(ingress)
1059                        .health_check_payload(serde_json::json!({
1060                            "test": "health_check"
1061                        }))
1062                        .start()
1063                        .await;
1064                });
1065
1066                // Hit health endpoint 200 times to verify consistency
1067                let mut success_count = 0;
1068                let mut failures = Vec::new();
1069
1070                for i in 1..=200 {
1071                    let response = client.get(&health_url).send().await.unwrap();
1072                    let status = response.status();
1073                    let body = response.text().await.unwrap();
1074
1075                    if status == 200 && body.contains("\"status\":\"ready\"") {
1076                        success_count += 1;
1077                    } else {
1078                        failures.push((i, status.as_u16(), body.clone()));
1079                        if failures.len() <= 5 {  // Only log first 5 failures
1080                            tracing::warn!("Request {}: status={}, body={}", i, status, body);
1081                        }
1082                    }
1083                }
1084
1085                tracing::info!("Health endpoint test results: {success_count}/200 requests succeeded");
1086                if !failures.is_empty() {
1087                    tracing::warn!("Failed requests: {}", failures.len());
1088                }
1089
1090                // Expect at least 150 out of 200 requests to be successful
1091                assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1092            },
1093        )
1094        .await;
1095    }
1096
1097    #[tokio::test]
1098    async fn test_spawn_system_status_server_endpoints() {
1099        // use reqwest for HTTP requests
1100        temp_env::async_with_vars(
1101            [
1102                (env_system::DYN_SYSTEM_PORT, Some("0")),
1103                (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1104            ],
1105            async {
1106                let drt = Arc::new(create_test_drt_async().await);
1107
1108                // Get system status server info from DRT (instead of manually spawning)
1109                let system_info = drt
1110                    .system_status_server_info()
1111                    .expect("System status server should be started by DRT");
1112                let addr = system_info.socket_addr;
1113                let client = reqwest::Client::new();
1114                for (path, expect_200, expect_body) in [
1115                    ("/health", true, "ready"),
1116                    ("/live", true, "ready"),
1117                    ("/someRandomPathNotFoundHere", false, "Route not found"),
1118                ] {
1119                    println!("[test] Sending request to {}", path);
1120                    let url = format!("http://{}{}", addr, path);
1121                    let response = client.get(&url).send().await.unwrap();
1122                    let status = response.status();
1123                    let body = response.text().await.unwrap();
1124                    println!(
1125                        "[test] Response for {}: status={}, body={:?}",
1126                        path, status, body
1127                    );
1128                    if expect_200 {
1129                        assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1130                    } else {
1131                        assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1132                    }
1133                    assert!(
1134                        body.contains(expect_body),
1135                        "Response: status={}, body={:?}",
1136                        status,
1137                        body
1138                    );
1139                }
1140                // DRT handles server cleanup automatically
1141            },
1142        )
1143        .await;
1144    }
1145
1146    #[cfg(feature = "integration")]
1147    #[tokio::test]
1148    async fn test_health_check_with_payload_and_timeout() {
1149        // Test the complete health check flow with the new canary-based system:
1150        crate::logging::init();
1151
1152        temp_env::async_with_vars(
1153            [
1154                (env_system::DYN_SYSTEM_PORT, Some("0")),
1155                (
1156                    env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1157                    Some("notready"),
1158                ),
1159                (
1160                    env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1161                    Some("[\"test.endpoint\"]"),
1162                ),
1163                // Enable health check with short intervals for testing
1164                ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1165                (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), // Send canary after 1 second of inactivity
1166                ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
1167                ("RUST_LOG", Some("info")),                      // Enable logging for test
1168            ],
1169            async {
1170                let drt = Arc::new(create_test_drt_async().await);
1171
1172                // Get system status server info
1173                let system_info = drt
1174                    .system_status_server_info()
1175                    .expect("System status server should be started");
1176                let addr = system_info.socket_addr;
1177
1178                let client = reqwest::Client::new();
1179                let health_url = format!("http://{}/health", addr);
1180
1181                // Register an endpoint with health check payload
1182                let endpoint = "test.endpoint";
1183                let health_check_payload = serde_json::json!({
1184                    "prompt": "health check test",
1185                    "_health_check": true
1186                });
1187
1188                // Register the endpoint and its health check payload
1189                {
1190                    let system_health = drt.system_health();
1191                    let system_health_lock = system_health.lock();
1192                    system_health_lock.register_health_check_target(
1193                        endpoint,
1194                        crate::component::Instance {
1195                            component: "test_component".to_string(),
1196                            endpoint: "health".to_string(),
1197                            namespace: "test_namespace".to_string(),
1198                            instance_id: 1,
1199                            transport: crate::component::TransportType::Nats(endpoint.to_string()),
1200                            device_type: None,
1201                        },
1202                        health_check_payload.clone(),
1203                    );
1204                }
1205
1206                // Check initial health - should be ready (default state)
1207                let response = client.get(&health_url).send().await.unwrap();
1208                let status = response.status();
1209                let body = response.text().await.unwrap();
1210                assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1211                assert!(
1212                    body.contains("\"status\":\"notready\""),
1213                    "Should show notready status initially"
1214                );
1215
1216                // Set endpoint to healthy state
1217                drt.system_health()
1218                    .lock()
1219                    .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1220
1221                // Check health again - should now be healthy
1222                let response = client.get(&health_url).send().await.unwrap();
1223                let status = response.status();
1224                let body = response.text().await.unwrap();
1225
1226                assert_eq!(status, 200, "Should be healthy due to recent response");
1227                assert!(
1228                    body.contains("\"status\":\"ready\""),
1229                    "Should show ready status after response"
1230                );
1231
1232                // Verify the endpoint status in SystemHealth directly
1233                let endpoint_status = drt
1234                    .system_health()
1235                    .lock()
1236                    .get_endpoint_health_status(endpoint);
1237                assert_eq!(
1238                    endpoint_status,
1239                    Some(HealthStatus::Ready),
1240                    "SystemHealth should show endpoint as Ready after response"
1241                );
1242            },
1243        )
1244        .await;
1245    }
1246}