use crate::config::HealthStatus;
use crate::config::environment_names::logging as env_logging;
use crate::config::environment_names::runtime::canary as env_canary;
use crate::config::environment_names::runtime::system as env_system;
use crate::logging::make_request_span;
use crate::metrics::MetricsHierarchy;
use crate::traits::DistributedRuntimeProvider;
use axum::{
Router,
body::Bytes,
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
routing::{any, delete, get, post},
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
#[derive(Debug)]
pub struct SystemStatusServerInfo {
pub socket_addr: std::net::SocketAddr,
pub handle: Option<Arc<JoinHandle<()>>>,
}
impl SystemStatusServerInfo {
pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
Self {
socket_addr,
handle: handle.map(Arc::new),
}
}
pub fn address(&self) -> String {
self.socket_addr.to_string()
}
pub fn hostname(&self) -> String {
self.socket_addr.ip().to_string()
}
pub fn port(&self) -> u16 {
self.socket_addr.port()
}
}
impl Clone for SystemStatusServerInfo {
fn clone(&self) -> Self {
Self {
socket_addr: self.socket_addr,
handle: self.handle.clone(),
}
}
}
pub struct SystemStatusState {
root_drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
}
impl SystemStatusState {
pub fn new(
drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<Self> {
Ok(Self {
root_drt: drt,
discovery_metadata,
})
}
pub fn drt(&self) -> &crate::DistributedRuntime {
&self.root_drt
}
pub fn discovery_metadata(
&self,
) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
self.discovery_metadata.as_ref()
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LoadLoraRequest {
pub lora_name: String,
pub source: LoraSource,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LoraSource {
pub uri: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LoraResponse {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lora_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lora_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub loras: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub count: Option<usize>,
}
pub async fn spawn_system_status_server(
host: &str,
port: u16,
cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
let health_path = server_state
.drt()
.system_health()
.lock()
.health_path()
.to_string();
let live_path = server_state
.drt()
.system_health()
.lock()
.live_path()
.to_string();
let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false);
let mut app = Router::new()
.route(
&health_path,
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
}),
)
.route(
&live_path,
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
}),
)
.route(
"/metrics",
get({
let state = Arc::clone(&server_state);
move || metrics_handler(state)
}),
)
.route(
"/metadata",
get({
let state = Arc::clone(&server_state);
move || metadata_handler(state)
}),
)
.route(
"/engine/{*path}",
any({
let state = Arc::clone(&server_state);
move |path, body| engine_route_handler(state, path, body)
}),
);
if lora_enabled {
app = app
.route(
"/v1/loras",
get({
let state = Arc::clone(&server_state);
move || list_loras_handler(State(state))
})
.post({
let state = Arc::clone(&server_state);
move |body| load_lora_handler(State(state), body)
}),
)
.route(
"/v1/loras/{*lora_name}",
delete({
let state = Arc::clone(&server_state);
move |path| unload_lora_handler(State(state), path)
}),
);
}
let app = app
.fallback(|| async {
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
})
.layer(TraceLayer::new_for_http().make_span_with(make_request_span));
let address = format!("{}:{}", host, port);
tracing::info!("[spawn_system_status_server] binding to: {}", address);
let listener = match TcpListener::bind(&address).await {
Ok(listener) => {
let actual_address = listener.local_addr()?;
tracing::info!(
"[spawn_system_status_server] system status server bound to: {}",
actual_address
);
(listener, actual_address)
}
Err(e) => {
tracing::error!("Failed to bind to address {}: {}", address, e);
return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
}
};
let (listener, actual_address) = listener;
let observer = cancel_token.child_token();
let handle = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(observer.cancelled_owned())
.await
{
tracing::error!("System status server error: {}", e);
}
});
Ok((actual_address, handle))
}
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let system_health = state.drt().system_health();
let system_health_lock = system_health.lock();
let (healthy, endpoints) = system_health_lock.get_health_status();
let uptime = Some(system_health_lock.uptime());
drop(system_health_lock);
let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = json!({
"status": healthy_string,
"uptime": uptime,
"endpoints": endpoints,
});
tracing::trace!("Response {}", response.to_string());
(status_code, response.to_string())
}
#[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let response = match state.drt().metrics().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to get metrics from registry: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get metrics".to_string(),
);
}
};
(StatusCode::OK, response)
}
#[tracing::instrument(skip_all, level = "trace")]
async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let metadata = match state.discovery_metadata() {
Some(metadata) => metadata,
None => {
tracing::debug!("Metadata endpoint called but no discovery metadata available");
return (
StatusCode::NOT_FOUND,
"Discovery metadata not available".to_string(),
)
.into_response();
}
};
let metadata_guard = metadata.read().await;
match serde_json::to_string(&*metadata_guard) {
Ok(json) => {
tracing::trace!("Returning metadata: {} bytes", json.len());
(StatusCode::OK, json).into_response()
}
Err(e) => {
tracing::error!("Failed to serialize metadata: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize metadata".to_string(),
)
.into_response()
}
}
}
#[tracing::instrument(skip_all, level = "debug")]
async fn load_lora_handler(
State(state): State<Arc<SystemStatusState>>,
Json(request): Json<LoadLoraRequest>,
) -> impl IntoResponse {
tracing::info!("Loading LoRA: {}", request.lora_name);
match call_lora_endpoint(
state.drt(),
"load_lora",
json!({
"lora_name": request.lora_name,
"source": {
"uri": request.source.uri
},
}),
)
.await
{
Ok(response) => {
if response.status == "error" {
tracing::error!(
"Failed to load LoRA {}: {}",
request.lora_name,
response.message.as_deref().unwrap_or("Unknown error")
);
(StatusCode::INTERNAL_SERVER_ERROR, Json(response))
} else {
tracing::info!("LoRA loaded successfully: {}", request.lora_name);
(StatusCode::OK, Json(response))
}
}
Err(e) => {
tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(LoraResponse {
status: "error".to_string(),
message: Some(e.to_string()),
lora_name: Some(request.lora_name),
lora_id: None,
loras: None,
count: None,
}),
)
}
}
}
#[tracing::instrument(skip_all, level = "debug")]
async fn unload_lora_handler(
State(state): State<Arc<SystemStatusState>>,
Path(lora_name): Path<String>,
) -> impl IntoResponse {
let lora_name = lora_name
.strip_prefix('/')
.unwrap_or(&lora_name)
.to_string();
tracing::info!("Unloading LoRA: {}", lora_name);
match call_lora_endpoint(
state.drt(),
"unload_lora",
json!({
"lora_name": lora_name.clone(),
}),
)
.await
{
Ok(response) => {
if response.status == "error" {
tracing::error!(
"Failed to unload LoRA {}: {}",
lora_name,
response.message.as_deref().unwrap_or("Unknown error")
);
(StatusCode::INTERNAL_SERVER_ERROR, Json(response))
} else {
tracing::info!("LoRA unloaded successfully: {}", lora_name);
(StatusCode::OK, Json(response))
}
}
Err(e) => {
tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(LoraResponse {
status: "error".to_string(),
message: Some(e.to_string()),
lora_name: Some(lora_name),
lora_id: None,
loras: None,
count: None,
}),
)
}
}
}
#[tracing::instrument(skip_all, level = "debug")]
async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
tracing::info!("Listing all LoRAs");
match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
Ok(response) => {
tracing::info!("Successfully retrieved LoRA list");
(StatusCode::OK, Json(response))
}
Err(e) => {
tracing::error!("Failed to list LoRAs: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(LoraResponse {
status: "error".to_string(),
message: Some(e.to_string()),
lora_name: None,
lora_id: None,
loras: None,
count: None,
}),
)
}
}
}
async fn call_lora_endpoint(
drt: &crate::DistributedRuntime,
endpoint_name: &str,
request_body: serde_json::Value,
) -> anyhow::Result<LoraResponse> {
use crate::engine::AsyncEngine;
tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
let local_registry = drt.local_endpoint_registry();
let engine = local_registry
.get(endpoint_name)
.ok_or_else(|| {
anyhow::anyhow!(
"Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
endpoint_name
)
})?;
tracing::debug!(
"Found endpoint '{}' in local registry, calling directly",
endpoint_name
);
let request = crate::pipeline::SingleIn::new(request_body);
let mut stream = engine.generate(request).await?;
if let Some(response) = stream.next().await {
let response_data = response.data.unwrap_or_default();
let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
.unwrap_or_else(|_| parse_lora_response(&response_data));
return Ok(lora_response);
}
anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
}
fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
LoraResponse {
status: response_data
.get("status")
.and_then(|s| s.as_str())
.unwrap_or("success")
.to_string(),
message: response_data
.get("message")
.and_then(|m| m.as_str())
.map(|s| s.to_string()),
lora_name: response_data
.get("lora_name")
.and_then(|n| n.as_str())
.map(|s| s.to_string()),
lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
loras: response_data.get("loras").cloned(),
count: response_data
.get("count")
.and_then(|c| c.as_u64())
.map(|c| c as usize),
}
}
#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
async fn engine_route_handler(
state: Arc<SystemStatusState>,
Path(path): Path<String>,
body: Bytes,
) -> impl IntoResponse {
tracing::trace!("Engine route request to /engine/{}", path);
let body_json: serde_json::Value = if body.is_empty() {
serde_json::json!({})
} else {
match serde_json::from_slice(&body) {
Ok(json) => json,
Err(e) => {
tracing::warn!("Invalid JSON in request body: {}", e);
return (
StatusCode::BAD_REQUEST,
json!({
"error": "Invalid JSON",
"message": format!("{}", e)
})
.to_string(),
)
.into_response();
}
}
};
let callback = match state.drt().engine_routes().get(&path) {
Some(cb) => cb,
None => {
tracing::debug!("Route /engine/{} not found", path);
return (
StatusCode::NOT_FOUND,
json!({
"error": "Route not found",
"message": format!("Route /engine/{} not found", path)
})
.to_string(),
)
.into_response();
}
};
match callback(body_json).await {
Ok(response) => {
tracing::trace!("Engine route handler succeeded for /engine/{}", path);
(StatusCode::OK, response.to_string()).into_response()
}
Err(e) => {
tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({
"error": "Handler error",
"message": format!("{}", e)
})
.to_string(),
)
.into_response()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::Duration;
#[tokio::test]
async fn test_http_server_lifecycle() {
let cancel_token = CancellationToken::new();
let cancel_token_for_server = cancel_token.clone();
let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
let server_handle = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _ = axum::serve(listener, app)
.with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
.await;
});
cancel_token.cancel();
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(
result.is_ok(),
"HTTP server should shut down when cancel token is cancelled"
);
}
}
#[cfg(all(test, feature = "integration"))]
mod integration_tests {
use super::*;
use crate::config::environment_names::logging as env_logging;
use crate::config::environment_names::runtime::canary as env_canary;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::metrics::MetricsHierarchy;
use anyhow::Result;
use rstest::rstest;
use std::sync::Arc;
use tokio::time::Duration;
#[tokio::test]
async fn test_uptime_from_system_health() {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
let uptime = drt.system_health().lock().uptime();
assert!(uptime.as_nanos() > 0 || uptime.is_zero());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after = drt.system_health().lock().uptime();
assert!(uptime_after > uptime);
})
.await;
}
#[tokio::test]
async fn test_runtime_metrics_initialization_and_namespace() {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
let response = drt.metrics().prometheus_expfmt().unwrap();
println!("Full metrics response:\n{}", response);
assert!(
response.contains("# HELP dynamo_component_uptime_seconds"),
"Should contain uptime_seconds help text"
);
assert!(
response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
"Should contain uptime_seconds type"
);
assert!(
response.contains("dynamo_component_uptime_seconds"),
"Should contain uptime_seconds metric with correct namespace"
);
})
.await;
}
#[tokio::test]
async fn test_uptime_gauge_updates() {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
let initial_uptime = drt.system_health().lock().uptime();
drt.system_health().lock().update_uptime_gauge();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after_sleep = drt.system_health().lock().uptime();
drt.system_health().lock().update_uptime_gauge();
let elapsed = uptime_after_sleep - initial_uptime;
assert!(
elapsed >= std::time::Duration::from_millis(100),
"Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
elapsed
);
})
.await;
}
#[tokio::test]
async fn test_http_requests_fail_when_system_disabled() {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
let system_info = drt.system_status_server_info();
assert!(
system_info.is_none(),
"System status server should not be running when disabled"
);
println!("✓ System status server correctly disabled when not enabled");
})
.await;
}
#[rstest]
#[case("ready", 200, "ready", None, None, 3)]
#[case("notready", 503, "notready", None, None, 3)]
#[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
#[case(
"notready",
503,
"notready",
Some("/custom/health"),
Some("/custom/live"),
5
)]
#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
#[case] custom_health_path: Option<&'static str>,
#[case] custom_live_path: Option<&'static str>,
#[case] expected_num_tests: usize,
) {
use std::sync::Arc;
crate::logging::init();
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[
(env_system::DYN_SYSTEM_PORT, Some("0")),
(
env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
Some(starting_health_status),
),
(env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
(env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
],
(async || {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let mut test_cases = vec![];
match custom_health_path {
None => {
test_cases.push(("/health", expected_status, expected_body));
}
Some(chp) => {
test_cases.push(("/health", 404, "Route not found"));
test_cases.push((chp, expected_status, expected_body));
}
}
match custom_live_path {
None => {
test_cases.push(("/live", expected_status, expected_body));
}
Some(clp) => {
test_cases.push(("/live", 404, "Route not found"));
test_cases.push((clp, expected_status, expected_body));
}
}
test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
assert_eq!(test_cases.len(), expected_num_tests);
for (path, expect_status, expect_body) in test_cases {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
})(),
)
.await;
}
#[tokio::test]
async fn test_health_endpoint_tracing() -> Result<()> {
use std::sync::Arc;
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
(env_logging::DYN_LOGGING_JSONL, Some("1")),
(env_logging::DYN_LOG, Some("trace")),
],
(async || {
crate::logging::init();
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_static("traceparent"),
reqwest::header::HeaderValue::from_str(traceparent_value)?,
);
headers.insert(
reqwest::header::HeaderName::from_static("tracestate"),
reqwest::header::HeaderValue::from_str(tracestate_value)?,
);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).headers(headers).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
tracing::info!(body = body, status = status.to_string());
}
Ok::<(), anyhow::Error>(())
})(),
)
.await;
Ok(())
}
#[tokio::test]
async fn test_health_endpoint_with_changing_health_status() {
const ENDPOINT_NAME: &str = "generate";
const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
temp_env::async_with_vars(
[
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
(env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info_opt = drt.system_status_server_info();
assert!(
system_info_opt.is_some(),
"System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
std::env::var(env_system::DYN_SYSTEM_PORT)
);
let system_info = system_info_opt.unwrap();
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
let namespace = drt.namespace("ns1234").unwrap();
let component = namespace.component("comp1234").unwrap();
use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
use crate::protocols::annotated::Annotated;
struct TestHandler;
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
let response = Annotated::from_data(format!("You responded: {}", data));
Ok(crate::pipeline::ResponseStream::new(
Box::pin(crate::stream::iter(vec![response])),
ctx.context()
))
}
}
let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
tokio::spawn(async move {
let _ = component.endpoint(ENDPOINT_NAME)
.endpoint_builder()
.handler(ingress)
.health_check_payload(serde_json::json!({
"test": "health_check"
}))
.start()
.await;
});
let mut success_count = 0;
let mut failures = Vec::new();
for i in 1..=200 {
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
if status == 200 && body.contains("\"status\":\"ready\"") {
success_count += 1;
} else {
failures.push((i, status.as_u16(), body.clone()));
if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
}
}
}
tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
if !failures.is_empty() {
tracing::warn!("Failed requests: {}", failures.len());
}
assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
},
)
.await;
}
#[tokio::test]
async fn test_spawn_system_status_server_endpoints() {
temp_env::async_with_vars(
[
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "ready"),
("/live", true, "ready"),
("/someRandomPathNotFoundHere", false, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
if expect_200 {
assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
} else {
assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
}
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
},
)
.await;
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_health_check_with_payload_and_timeout() {
crate::logging::init();
temp_env::async_with_vars(
[
(env_system::DYN_SYSTEM_PORT, Some("0")),
(
env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
Some("notready"),
),
(
env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
Some("[\"test.endpoint\"]"),
),
("DYN_HEALTH_CHECK_ENABLED", Some("true")),
(env_canary::DYN_CANARY_WAIT_TIME, Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
let endpoint = "test.endpoint";
let health_check_payload = serde_json::json!({
"prompt": "health check test",
"_health_check": true
});
{
let system_health = drt.system_health();
let system_health_lock = system_health.lock();
system_health_lock.register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "health".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 1,
transport: crate::component::TransportType::Nats(endpoint.to_string()),
},
health_check_payload.clone(),
);
}
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 503, "Should be unhealthy initially (default state)");
assert!(
body.contains("\"status\":\"notready\""),
"Should show notready status initially"
);
drt.system_health()
.lock()
.set_endpoint_health_status(endpoint, HealthStatus::Ready);
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 200, "Should be healthy due to recent response");
assert!(
body.contains("\"status\":\"ready\""),
"Should show ready status after response"
);
let endpoint_status = drt
.system_health()
.lock()
.get_endpoint_health_status(endpoint);
assert_eq!(
endpoint_status,
Some(HealthStatus::Ready),
"SystemHealth should show endpoint as Ready after response"
);
},
)
.await;
}
}