use crate::config::HealthStatus;
use crate::logging::make_request_span;
use crate::metrics::MetricsHierarchy;
use crate::metrics::prometheus_names::{nats_client, nats_service};
use crate::traits::DistributedRuntimeProvider;
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::Instant;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;
#[derive(Debug)]
pub struct SystemStatusServerInfo {
pub socket_addr: std::net::SocketAddr,
pub handle: Option<Arc<JoinHandle<()>>>,
}
impl SystemStatusServerInfo {
pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
Self {
socket_addr,
handle: handle.map(Arc::new),
}
}
pub fn address(&self) -> String {
self.socket_addr.to_string()
}
pub fn hostname(&self) -> String {
self.socket_addr.ip().to_string()
}
pub fn port(&self) -> u16 {
self.socket_addr.port()
}
}
impl Clone for SystemStatusServerInfo {
fn clone(&self) -> Self {
Self {
socket_addr: self.socket_addr,
handle: self.handle.clone(),
}
}
}
pub struct SystemStatusState {
root_drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
}
impl SystemStatusState {
pub fn new(
drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<Self> {
Ok(Self {
root_drt: drt,
discovery_metadata,
})
}
pub fn drt(&self) -> &crate::DistributedRuntime {
&self.root_drt
}
pub fn discovery_metadata(
&self,
) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
self.discovery_metadata.as_ref()
}
}
pub async fn spawn_system_status_server(
host: &str,
port: u16,
cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
let health_path = server_state
.drt()
.system_health()
.lock()
.health_path()
.to_string();
let live_path = server_state
.drt()
.system_health()
.lock()
.live_path()
.to_string();
let app = Router::new()
.route(
&health_path,
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
}),
)
.route(
&live_path,
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
}),
)
.route(
"/metrics",
get({
let state = Arc::clone(&server_state);
move || metrics_handler(state)
}),
)
.route(
"/metadata",
get({
let state = Arc::clone(&server_state);
move || metadata_handler(state)
}),
)
.fallback(|| async {
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
})
.layer(TraceLayer::new_for_http().make_span_with(make_request_span));
let address = format!("{}:{}", host, port);
tracing::info!("[spawn_system_status_server] binding to: {}", address);
let listener = match TcpListener::bind(&address).await {
Ok(listener) => {
let actual_address = listener.local_addr()?;
tracing::info!(
"[spawn_system_status_server] system status server bound to: {}",
actual_address
);
(listener, actual_address)
}
Err(e) => {
tracing::error!("Failed to bind to address {}: {}", address, e);
return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
}
};
let (listener, actual_address) = listener;
let observer = cancel_token.child_token();
let handle = tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(observer.cancelled_owned())
.await
{
tracing::error!("System status server error: {}", e);
}
});
Ok((actual_address, handle))
}
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let system_health = state.drt().system_health();
let system_health_lock = system_health.lock();
let (healthy, endpoints) = system_health_lock.get_health_status();
let uptime = Some(system_health_lock.uptime());
drop(system_health_lock);
let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = json!({
"status": healthy_string,
"uptime": uptime,
"endpoints": endpoints,
});
tracing::trace!("Response {}", response.to_string());
(status_code, response.to_string())
}
#[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
state.drt().system_health().lock().update_uptime_gauge();
let response = match state.drt().metrics().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to get metrics from registry: {}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get metrics".to_string(),
);
}
};
(StatusCode::OK, response)
}
#[tracing::instrument(skip_all, level = "trace")]
async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let metadata = match state.discovery_metadata() {
Some(metadata) => metadata,
None => {
tracing::debug!("Metadata endpoint called but no discovery metadata available");
return (
StatusCode::NOT_FOUND,
"Discovery metadata not available".to_string(),
)
.into_response();
}
};
let metadata_guard = metadata.read().await;
match serde_json::to_string(&*metadata_guard) {
Ok(json) => {
tracing::trace!("Returning metadata: {} bytes", json.len());
(StatusCode::OK, json).into_response()
}
Err(e) => {
tracing::error!("Failed to serialize metadata: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to serialize metadata".to_string(),
)
.into_response()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::Duration;
#[tokio::test]
async fn test_http_server_lifecycle() {
let cancel_token = CancellationToken::new();
let cancel_token_for_server = cancel_token.clone();
let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
let server_handle = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _ = axum::serve(listener, app)
.with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
.await;
});
cancel_token.cancel();
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(
result.is_ok(),
"HTTP server should shut down when cancel token is cancelled"
);
}
}
#[cfg(all(test, feature = "integration"))]
mod integration_tests {
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::metrics::MetricsHierarchy;
use anyhow::Result;
use rstest::rstest;
use std::sync::Arc;
use tokio::time::Duration;
#[tokio::test]
async fn test_uptime_from_system_health() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
let drt = create_test_drt_async().await;
let uptime = drt.system_health().lock().uptime();
assert!(uptime.as_nanos() > 0 || uptime.is_zero());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after = drt.system_health().lock().uptime();
assert!(uptime_after > uptime);
})
.await;
}
#[tokio::test]
async fn test_runtime_metrics_initialization_and_namespace() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
let drt = create_test_drt_async().await;
let response = drt.metrics().prometheus_expfmt().unwrap();
println!("Full metrics response:\n{}", response);
let filtered_response: String = response
.lines()
.filter(|line| {
!line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
})
.collect::<Vec<_>>()
.join("\n");
assert!(
filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
"Should contain uptime_seconds help text"
);
assert!(
filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
"Should contain uptime_seconds type"
);
assert!(
filtered_response.contains("dynamo_component_uptime_seconds"),
"Should contain uptime_seconds metric with correct namespace"
);
})
.await;
}
#[tokio::test]
async fn test_uptime_gauge_updates() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
let drt = create_test_drt_async().await;
let initial_uptime = drt.system_health().lock().uptime();
drt.system_health().lock().update_uptime_gauge();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after_sleep = drt.system_health().lock().uptime();
drt.system_health().lock().update_uptime_gauge();
let elapsed = uptime_after_sleep - initial_uptime;
assert!(
elapsed >= std::time::Duration::from_millis(100),
"Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
elapsed
);
})
.await;
}
#[tokio::test]
async fn test_http_requests_fail_when_system_disabled() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
let drt = create_test_drt_async().await;
let system_info = drt.system_status_server_info();
assert!(
system_info.is_none(),
"System status server should not be running when disabled"
);
println!("✓ System status server correctly disabled when not enabled");
})
.await;
}
#[rstest]
#[case("ready", 200, "ready", None, None, 3)]
#[case("notready", 503, "notready", None, None, 3)]
#[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
#[case(
"notready",
503,
"notready",
Some("/custom/health"),
Some("/custom/live"),
5
)]
#[tokio::test]
#[cfg(feature = "integration")]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
#[case] custom_health_path: Option<&'static str>,
#[case] custom_live_path: Option<&'static str>,
#[case] expected_num_tests: usize,
) {
use std::sync::Arc;
crate::logging::init();
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
),
("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
],
(async || {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let mut test_cases = vec![];
match custom_health_path {
None => {
test_cases.push(("/health", expected_status, expected_body));
}
Some(chp) => {
test_cases.push(("/health", 404, "Route not found"));
test_cases.push((chp, expected_status, expected_body));
}
}
match custom_live_path {
None => {
test_cases.push(("/live", expected_status, expected_body));
}
Some(clp) => {
test_cases.push(("/live", 404, "Route not found"));
test_cases.push((clp, expected_status, expected_body));
}
}
test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
assert_eq!(test_cases.len(), expected_num_tests);
for (path, expect_status, expect_body) in test_cases {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
})(),
)
.await;
}
#[tokio::test]
async fn test_health_endpoint_tracing() -> Result<()> {
use std::sync::Arc;
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
("DYN_LOGGING_JSONL", Some("1")),
("DYN_LOG", Some("trace")),
],
(async || {
crate::logging::init();
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_static("traceparent"),
reqwest::header::HeaderValue::from_str(traceparent_value)?,
);
headers.insert(
reqwest::header::HeaderName::from_static("tracestate"),
reqwest::header::HeaderValue::from_str(tracestate_value)?,
);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).headers(headers).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
tracing::info!(body = body, status = status.to_string());
}
Ok::<(), anyhow::Error>(())
})(),
)
.await;
Ok(())
}
#[tokio::test]
async fn test_health_endpoint_with_changing_health_status() {
const ENDPOINT_NAME: &str = "generate";
const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info_opt = drt.system_status_server_info();
assert!(
system_info_opt.is_some(),
"System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
std::env::var("DYN_SYSTEM_PORT")
);
let system_info = system_info_opt.unwrap();
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
let namespace = drt.namespace("ns1234").unwrap();
let mut component = namespace.component("comp1234").unwrap();
use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
use crate::protocols::annotated::Annotated;
struct TestHandler;
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
let response = Annotated::from_data(format!("You responded: {}", data));
Ok(crate::pipeline::ResponseStream::new(
Box::pin(crate::stream::iter(vec![response])),
ctx.context()
))
}
}
let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
tokio::spawn(async move {
component.add_stats_service().await.unwrap();
let _ = component.endpoint(ENDPOINT_NAME)
.endpoint_builder()
.handler(ingress)
.health_check_payload(serde_json::json!({
"test": "health_check"
}))
.start()
.await;
});
let mut success_count = 0;
let mut failures = Vec::new();
for i in 1..=200 {
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
if status == 200 && body.contains("\"status\":\"ready\"") {
success_count += 1;
} else {
failures.push((i, status.as_u16(), body.clone()));
if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
}
}
}
tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
if !failures.is_empty() {
tracing::warn!("Failed requests: {}", failures.len());
}
assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
},
)
.await;
}
#[tokio::test]
async fn test_spawn_system_status_server_endpoints() {
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "ready"),
("/live", true, "ready"),
("/someRandomPathNotFoundHere", false, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
if expect_200 {
assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
} else {
assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
}
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
},
)
.await;
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_health_check_with_payload_and_timeout() {
crate::logging::init();
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
(
"DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
Some("[\"test.endpoint\"]"),
),
("DYN_HEALTH_CHECK_ENABLED", Some("true")),
("DYN_CANARY_WAIT_TIME", Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
async {
let drt = Arc::new(create_test_drt_async().await);
let system_info = drt
.system_status_server_info()
.expect("System status server should be started");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
let endpoint = "test.endpoint";
let health_check_payload = serde_json::json!({
"prompt": "health check test",
"_health_check": true
});
{
let system_health = drt.system_health();
let system_health_lock = system_health.lock();
system_health_lock.register_health_check_target(
endpoint,
crate::component::Instance {
component: "test_component".to_string(),
endpoint: "health".to_string(),
namespace: "test_namespace".to_string(),
instance_id: 1,
transport: crate::component::TransportType::Nats(endpoint.to_string()),
},
health_check_payload.clone(),
);
}
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 503, "Should be unhealthy initially (default state)");
assert!(
body.contains("\"status\":\"notready\""),
"Should show notready status initially"
);
drt.system_health()
.lock()
.set_endpoint_health_status(endpoint, HealthStatus::Ready);
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
assert_eq!(status, 200, "Should be healthy due to recent response");
assert!(
body.contains("\"status\":\"ready\""),
"Should show ready status after response"
);
let endpoint_status = drt
.system_health()
.lock()
.get_endpoint_health_status(endpoint);
assert_eq!(
endpoint_status,
Some(HealthStatus::Ready),
"SystemHealth should show endpoint as Ready after response"
);
},
)
.await;
}
}