use crate::AppState;
use crate::api::client_context::logging_pool;
use actix_web::{HttpResponse, get, web};
use futures::future::join_all;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::path::Path;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use which::which;
use crate::data::cluster::{
ClusterMirrorRecord, fallback_cluster_mirrors, list_active_cluster_mirrors,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterCapabilityStatus {
pub available: bool,
pub detail: String,
}
#[derive(Debug, Clone, Serialize)]
struct RouteDefinition {
path: &'static str,
methods: &'static [&'static str],
summary: &'static str,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterCapabilities {
pub pg_dump: ClusterCapabilityStatus,
pub pg_restore: ClusterCapabilityStatus,
#[serde(default = "unknown_capability")]
pub s3cmd: ClusterCapabilityStatus,
pub s3_access: ClusterCapabilityStatus,
pub backup_create: ClusterCapabilityStatus,
pub backup_restore: ClusterCapabilityStatus,
pub openapi_download: ClusterCapabilityStatus,
pub management_api: ClusterCapabilityStatus,
pub gateway_fetch: ClusterCapabilityStatus,
#[serde(default)]
pub deadpool_experimental: Option<ClusterCapabilityStatus>,
}
fn unknown_capability() -> ClusterCapabilityStatus {
ClusterCapabilityStatus {
available: false,
detail: "Capability not reported by this node".to_string(),
}
}
fn route_supports(root_json: &Value, path: &str, method: &str) -> Option<bool> {
let routes: &Vec<Value> = root_json.get("routes")?.as_array()?;
let method_upper: String = method.to_uppercase();
let found: bool = routes.iter().any(|route| {
let route_path: Option<&str> = route.get("path").and_then(Value::as_str);
let methods: Option<&Vec<Value>> = route.get("methods").and_then(Value::as_array);
route_path == Some(path)
&& methods
.map(|items| {
items.iter().any(|m| {
m.as_str()
.map(|s| s.eq_ignore_ascii_case(&method_upper))
.unwrap_or(false)
})
})
.unwrap_or(false)
});
Some(found)
}
fn env_non_empty(key: &str) -> bool {
std::env::var(key)
.ok()
.map(|v| !v.trim().is_empty())
.unwrap_or(false)
}
fn detect_tool_path(
env_key: &str,
fallback_names: &[&str],
_fallback_windows: &[&str],
) -> Option<String> {
if let Ok(path) = std::env::var(env_key) {
if !path.trim().is_empty() && Path::new(path.trim()).is_file() {
return Some(path);
}
}
for name in fallback_names {
if let Ok(path) = which(name) {
return Some(path.display().to_string());
}
}
#[cfg(target_os = "windows")]
{
for candidate in _fallback_windows {
if Path::new(candidate).is_file() {
return Some((*candidate).to_string());
}
}
}
None
}
fn local_capabilities() -> ClusterCapabilities {
let dump_path: Option<String> = detect_tool_path(
"ATHENA_PG_DUMP_PATH",
&["pg_dump"],
&[r"C:\Program Files\PostgreSQL\18\bin\pg_dump.exe"],
);
let restore_path: Option<String> = detect_tool_path(
"ATHENA_PG_RESTORE_PATH",
&["pg_restore"],
&[r"C:\Program Files\PostgreSQL\18\bin\pg_restore.exe"],
);
let has_dump: bool = dump_path.is_some();
let has_restore: bool = restore_path.is_some();
let s3cmd_path: Option<String> = detect_tool_path("ATHENA_S3CMD_PATH", &["s3cmd"], &[]);
let has_s3cmd: bool = s3cmd_path.is_some();
let has_bucket: bool = env_non_empty("ATHENA_BACKUP_S3_BUCKET");
let has_access_key: bool = env_non_empty("ATHENA_BACKUP_S3_ACCESS_KEY");
let has_secret_key: bool = env_non_empty("ATHENA_BACKUP_S3_SECRET_KEY");
let dump_detail: String = dump_path
.as_ref()
.map(|p| format!("resolved at {}", p))
.unwrap_or_else(|| "pg_dump not found (ATHENA_PG_DUMP_PATH/PATH)".to_string());
let restore_detail: String = restore_path
.as_ref()
.map(|p| format!("resolved at {}", p))
.unwrap_or_else(|| "pg_restore not found (ATHENA_PG_RESTORE_PATH/PATH)".to_string());
let s3cmd_detail: String = s3cmd_path
.as_ref()
.map(|p| format!("resolved at {}", p))
.unwrap_or_else(|| "s3cmd not found (ATHENA_S3CMD_PATH/PATH)".to_string());
let s3_ready: bool =
has_bucket && ((has_access_key && has_secret_key) || (!has_access_key && !has_secret_key));
let s3_detail: String = if !has_bucket {
"ATHENA_BACKUP_S3_BUCKET is not set".to_string()
} else if has_access_key ^ has_secret_key {
"Set both ATHENA_BACKUP_S3_ACCESS_KEY and ATHENA_BACKUP_S3_SECRET_KEY, or neither"
.to_string()
} else {
"S3 backup environment variables are configured".to_string()
};
ClusterCapabilities {
pg_dump: ClusterCapabilityStatus {
available: has_dump,
detail: dump_detail,
},
pg_restore: ClusterCapabilityStatus {
available: has_restore,
detail: restore_detail,
},
s3cmd: ClusterCapabilityStatus {
available: has_s3cmd,
detail: s3cmd_detail,
},
s3_access: ClusterCapabilityStatus {
available: s3_ready,
detail: s3_detail,
},
backup_create: ClusterCapabilityStatus {
available: has_dump && s3_ready,
detail: if has_dump && s3_ready {
"Backup creation dependencies are available".to_string()
} else {
"Requires pg_dump and S3 access".to_string()
},
},
backup_restore: ClusterCapabilityStatus {
available: has_restore && s3_ready,
detail: if has_restore && s3_ready {
"Backup restore dependencies are available".to_string()
} else {
"Requires pg_restore and S3 access".to_string()
},
},
openapi_download: ClusterCapabilityStatus {
available: true,
detail: "OpenAPI route is exposed at /openapi.yaml".to_string(),
},
management_api: ClusterCapabilityStatus {
available: true,
detail: "Management routes are exposed under /management/*".to_string(),
},
gateway_fetch: ClusterCapabilityStatus {
available: true,
detail: "Gateway fetch route is exposed at /gateway/fetch".to_string(),
},
deadpool_experimental: Some(ClusterCapabilityStatus {
available: cfg!(feature = "deadpool_experimental"),
detail: if cfg!(feature = "deadpool_experimental") {
"Experimental deadpool backend compiled in".to_string()
} else {
"Experimental deadpool backend not compiled in".to_string()
},
}),
}
}
fn unavailable_capabilities(reason: &str) -> ClusterCapabilities {
ClusterCapabilities {
pg_dump: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
pg_restore: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
s3cmd: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
s3_access: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
backup_create: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
backup_restore: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
openapi_download: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
management_api: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
gateway_fetch: ClusterCapabilityStatus {
available: false,
detail: reason.to_string(),
},
deadpool_experimental: None,
}
}
const CLUSTER_CACHE_KEY: &str = "health:cluster";
const ROUTES: &[RouteDefinition] = &[
RouteDefinition {
path: "/",
methods: &["GET"],
summary: "API root and route listing",
},
RouteDefinition {
path: "/ping",
methods: &["GET"],
summary: "Health check",
},
RouteDefinition {
path: "/health/cluster",
methods: &["GET"],
summary: "Cluster mirror health and version checks",
},
RouteDefinition {
path: "/clients",
methods: &["GET"],
summary: "List Athena clients (protected)",
},
RouteDefinition {
path: "/schema/clients",
methods: &["GET"],
summary: "List Postgres clients",
},
RouteDefinition {
path: "/schema/tables",
methods: &["GET"],
summary: "List tables",
},
RouteDefinition {
path: "/schema/columns",
methods: &["GET"],
summary: "List columns",
},
RouteDefinition {
path: "/schema/migrations",
methods: &["GET"],
summary: "List schema migrations (graceful fallback when table absent)",
},
RouteDefinition {
path: "/router/registry",
methods: &["GET"],
summary: "Athena router registry",
},
RouteDefinition {
path: "/registry",
methods: &["GET"],
summary: "API registry",
},
RouteDefinition {
path: "/metrics",
methods: &["GET"],
summary: "Prometheus metrics endpoint",
},
RouteDefinition {
path: "/gateway/fetch",
methods: &["POST"],
summary: "Fetch data",
},
RouteDefinition {
path: "/gateway/update",
methods: &["POST"],
summary: "Update data",
},
RouteDefinition {
path: "/gateway/insert",
methods: &["PUT"],
summary: "Insert data",
},
RouteDefinition {
path: "/gateway/delete",
methods: &["DELETE"],
summary: "Delete data",
},
RouteDefinition {
path: "/gateway/query",
methods: &["POST"],
summary: "Execute SQL",
},
RouteDefinition {
path: "/gateway/rpc",
methods: &["POST"],
summary: "Execute a Postgres function as an RPC call",
},
RouteDefinition {
path: "/rpc/{function_name}",
methods: &["GET", "POST"],
summary: "Compatibility RPC endpoint for Postgres functions",
},
RouteDefinition {
path: "/public/{route_key}/{op}",
methods: &["POST"],
summary: "Dispatch a public route alias to gateway operations",
},
RouteDefinition {
path: "/query/sql",
methods: &["POST"],
summary: "Execute SQL via driver",
},
RouteDefinition {
path: "/gateway/sql",
methods: &["POST"],
summary: "Execute SQL via driver (gateway alias)",
},
RouteDefinition {
path: "/query/count",
methods: &["POST"],
summary: "Cached row count (COUNT query or table)",
},
RouteDefinition {
path: "/pipelines",
methods: &["POST"],
summary: "Run pipeline",
},
RouteDefinition {
path: "/management/capabilities",
methods: &["GET"],
summary: "List management API capabilities for a client",
},
RouteDefinition {
path: "/admin/public-routes",
methods: &["GET", "POST"],
summary: "List or create public route aliases (admin)",
},
RouteDefinition {
path: "/admin/public-routes/{route_key}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete public route aliases (admin)",
},
RouteDefinition {
path: "/management/tables",
methods: &["POST"],
summary: "Create a managed table",
},
RouteDefinition {
path: "/management/tables/{table_name}",
methods: &["PATCH", "DELETE"],
summary: "Edit or drop a managed table",
},
RouteDefinition {
path: "/management/tables/{table_name}/columns/{column_name}",
methods: &["DELETE"],
summary: "Drop a managed table column",
},
RouteDefinition {
path: "/management/indexes",
methods: &["POST"],
summary: "Create an index",
},
RouteDefinition {
path: "/management/indexes/{index_name}",
methods: &["DELETE"],
summary: "Drop an index",
},
RouteDefinition {
path: "/management/provision/providers/neon",
methods: &["POST"],
summary: "Provision/register a Neon database via management API",
},
RouteDefinition {
path: "/management/provision/providers/railway",
methods: &["POST"],
summary: "Provision/register a Railway database via management API",
},
RouteDefinition {
path: "/management/provision/providers/render",
methods: &["POST"],
summary: "Provision/register a Render database via management API",
},
RouteDefinition {
path: "/management/functions",
methods: &["GET", "PUT", "DELETE"],
summary: "List, upsert, or drop managed Postgres functions",
},
RouteDefinition {
path: "/admin/api-keys",
methods: &["GET", "POST"],
summary: "Manage API keys",
},
RouteDefinition {
path: "/admin/api-keys/{id}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an API key",
},
RouteDefinition {
path: "/admin/api-key-rights",
methods: &["GET", "POST"],
summary: "Manage API key rights",
},
RouteDefinition {
path: "/admin/api-key-rights/{id}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an API key right",
},
RouteDefinition {
path: "/admin/api-key-config",
methods: &["GET", "PUT"],
summary: "Manage global API key enforcement",
},
RouteDefinition {
path: "/admin/api-key-clients",
methods: &["GET"],
summary: "List per-client API key enforcement",
},
RouteDefinition {
path: "/admin/api-key-clients/{client_name}",
methods: &["PUT", "DELETE"],
summary: "Manage per-client API key enforcement",
},
RouteDefinition {
path: "/admin/clients",
methods: &["GET", "POST"],
summary: "Manage Athena client catalog",
},
RouteDefinition {
path: "/admin/clients/{client_name}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an Athena client",
},
RouteDefinition {
path: "/admin/clients/{client_name}/freeze",
methods: &["PUT"],
summary: "Freeze or unfreeze an Athena client",
},
RouteDefinition {
path: "/admin/clients/statistics",
methods: &["GET"],
summary: "List Athena client statistics",
},
RouteDefinition {
path: "/admin/clients/statistics/refresh",
methods: &["POST"],
summary: "Rebuild Athena client statistics from logs",
},
RouteDefinition {
path: "/admin/clients/{client_name}/statistics",
methods: &["GET"],
summary: "Inspect Athena client statistics",
},
RouteDefinition {
path: "/admin/clients/{client_name}/query-optimizations",
methods: &["GET"],
summary: "List query optimization recommendations for a client",
},
RouteDefinition {
path: "/admin/clients/{client_name}/query-optimizations/refresh",
methods: &["POST"],
summary: "Generate or refresh query optimization recommendations for a client",
},
RouteDefinition {
path: "/admin/clients/{client_name}/query-optimizations/runs",
methods: &["GET"],
summary: "List query optimization recommendation runs for a client",
},
RouteDefinition {
path: "/admin/clients/{client_name}/query-optimizations/{recommendation_id}/apply",
methods: &["POST"],
summary: "Apply a query optimization recommendation",
},
RouteDefinition {
path: "/admin/vacuum-health",
methods: &["GET"],
summary: "List latest vacuum health snapshot per Postgres client",
},
RouteDefinition {
path: "/admin/vacuum-health/{client_name}",
methods: &["GET"],
summary: "Latest vacuum health snapshot and per-table stats for one client",
},
RouteDefinition {
path: "/admin/admission-events",
methods: &["GET"],
summary: "List admission limiter events with optional decision and client filters",
},
RouteDefinition {
path: "/admin/provision",
methods: &["POST"],
summary: "Provision a database with the Athena schema",
},
RouteDefinition {
path: "/admin/provision/status",
methods: &["GET"],
summary: "Check database provisioning status",
},
RouteDefinition {
path: "/admin/provision/instances",
methods: &["GET", "POST"],
summary: "List or spin up managed Postgres instances",
},
RouteDefinition {
path: "/admin/provision/instances/{container_name}",
methods: &["GET", "DELETE"],
summary: "Inspect or delete a managed Postgres instance",
},
RouteDefinition {
path: "/admin/provision/instances/{container_name}/start",
methods: &["POST"],
summary: "Start a managed Postgres instance and optionally reconnect runtime client",
},
RouteDefinition {
path: "/admin/provision/instances/{container_name}/stop",
methods: &["POST"],
summary: "Stop a managed Postgres instance and optionally mark runtime client unavailable",
},
RouteDefinition {
path: "/admin/provision/instances/{container_name}/bindings",
methods: &["POST"],
summary: "Bind a managed Postgres instance host port to a named public route mapping",
},
RouteDefinition {
path: "/admin/provision/providers/neon",
methods: &["POST"],
summary: "Provision/register a Neon Postgres database",
},
RouteDefinition {
path: "/admin/provision/providers/railway",
methods: &["POST"],
summary: "Provision/register a Railway Postgres database",
},
RouteDefinition {
path: "/admin/provision/providers/render",
methods: &["POST"],
summary: "Provision/register a Render Postgres database",
},
RouteDefinition {
path: "/admin/backups",
methods: &["GET", "POST"],
summary: "List or create database backups",
},
RouteDefinition {
path: "/admin/backups/jobs/{id}/cancel",
methods: &["POST"],
summary: "Cancel a running or pending backup/restore job",
},
RouteDefinition {
path: "/admin/backups/{key}/restore",
methods: &["POST"],
summary: "Restore a database from an S3 backup",
},
RouteDefinition {
path: "/admin/backups/{key}/download",
methods: &["GET"],
summary: "Download a backup archive from S3",
},
RouteDefinition {
path: "/admin/backups/{key}",
methods: &["DELETE"],
summary: "Delete a backup from S3",
},
RouteDefinition {
path: "/openapi.yaml",
methods: &["GET"],
summary: "OpenAPI spec",
},
RouteDefinition {
path: "/openapi-wss.yaml",
methods: &["GET"],
summary: "WebSocket OpenAPI spec",
},
RouteDefinition {
path: "/wss/info",
methods: &["GET"],
summary: "WebSocket gateway contract",
},
RouteDefinition {
path: "/docs",
methods: &["GET"],
summary: "Documentation redirect",
},
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMirrorHealth {
pub url: String,
pub status: String,
pub latency_ms: Option<f64>,
pub download_bytes_per_sec: Option<f64>,
pub cargo_toml_version: Option<String>,
pub message: Option<String>,
pub capabilities: ClusterCapabilities,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterHealthPayload {
pub message: String,
pub version: String,
pub athena_api: String,
pub athena_deadpool: String,
pub athena_scylladb: String,
pub gateway_auth_store: String,
pub cargo_toml_version: String,
pub capabilities: ClusterCapabilities,
pub mirrors: Vec<ClusterMirrorHealth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ClusterHealthCacheEntry {
cached_at_epoch_seconds: i64,
payload: ClusterHealthPayload,
}
fn epoch_seconds() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn athena_deadpool_status(app_state: &AppState) -> &'static str {
if app_state.pg_registry.list_clients().is_empty() {
"offline"
} else {
"online"
}
}
fn gateway_auth_store_status(app_state: &AppState) -> &'static str {
let Some(client_name) = app_state.gateway_auth_client_name.as_ref() else {
return "not_configured";
};
if app_state.pg_registry.get_pool(client_name).is_some() {
"online"
} else {
"unavailable"
}
}
#[get("/")]
pub async fn root(app_state: web::Data<AppState>) -> HttpResponse {
let body: Value = json!({
"message": "athena is online",
"version": env!("CARGO_PKG_VERSION"),
"athena_api": "online",
"athena_deadpool": athena_deadpool_status(app_state.get_ref()),
"athena_scylladb": "online",
"gateway_auth_store": gateway_auth_store_status(app_state.get_ref()),
"cargo_toml_version": env!("CARGO_PKG_VERSION"),
"capabilities": local_capabilities(),
"routes": ROUTES,
});
HttpResponse::Ok()
.content_type("application/json")
.json(body)
}
async fn load_cluster_payload(app_state: &AppState) -> ClusterHealthPayload {
if let Some(cached) = app_state.cache.get(CLUSTER_CACHE_KEY).await
&& let Ok(entry) = serde_json::from_value::<ClusterHealthCacheEntry>(cached)
&& epoch_seconds() - entry.cached_at_epoch_seconds <= 15
{
return entry.payload;
}
let mirrors: Vec<ClusterMirrorRecord> = match logging_pool(app_state) {
Ok(pool) => list_active_cluster_mirrors(&pool)
.await
.ok()
.filter(|rows| !rows.is_empty())
.unwrap_or_else(fallback_cluster_mirrors),
Err(_) => fallback_cluster_mirrors(),
};
let client: reqwest::Client = app_state.client.clone();
let probes: Vec<(String, ClusterMirrorHealth)> = join_all(mirrors.into_iter().map(|mirror| {
let client: Client = client.clone();
async move {
let root_url: String = mirror.url.clone();
let ping_url: String = format!("{}/ping", mirror.url.trim_end_matches('/'));
let openapi_url: String = format!("{}/openapi.yaml", mirror.url.trim_end_matches('/'));
let ping_started: Instant = Instant::now();
let ping_response: Result<reqwest::Response, reqwest::Error> = client
.get(&ping_url)
.timeout(Duration::from_secs(3))
.send()
.await;
let latency_ms = ping_response
.as_ref()
.ok()
.map(|_| ping_started.elapsed().as_secs_f64() * 1000.0);
let root_response = client
.get(&root_url)
.timeout(Duration::from_secs(4))
.send()
.await;
let root_json: Option<Value> = match root_response {
Ok(response) => response.json::<Value>().await.ok(),
Err(_) => None,
};
let openapi_started: Instant = Instant::now();
let download_response: Result<reqwest::Response, reqwest::Error> = client
.get(&openapi_url)
.timeout(Duration::from_secs(5))
.send()
.await;
let download_bytes_per_sec: Option<f64> = match download_response {
Ok(response) => match response.bytes().await {
Ok(bytes) => {
let elapsed = openapi_started.elapsed().as_secs_f64().max(0.001);
Some(bytes.len() as f64 / elapsed)
}
Err(_) => None,
},
Err(_) => None,
};
let openapi_ok: bool = download_bytes_per_sec.is_some();
let status: &str = if root_json.is_some() && latency_ms.is_some() {
"online"
} else {
"offline"
};
let cargo_toml_version: Option<String> = root_json
.as_ref()
.and_then(|json| {
json.get("cargo_toml_version")
.or_else(|| json.get("version"))
})
.and_then(Value::as_str)
.map(str::to_string);
let message: Option<String> = root_json
.as_ref()
.and_then(|json| json.get("message"))
.and_then(Value::as_str)
.map(str::to_string);
let mirror_capabilities: ClusterCapabilities = root_json
.as_ref()
.and_then(|json| json.get("capabilities"))
.and_then(|value| serde_json::from_value::<ClusterCapabilities>(value.clone()).ok())
.map(|caps| ClusterCapabilities {
openapi_download: ClusterCapabilityStatus {
available: openapi_ok,
detail: if openapi_ok {
"Downloaded /openapi.yaml successfully".to_string()
} else {
"Failed to download /openapi.yaml".to_string()
},
},
management_api: ClusterCapabilityStatus {
available: root_json
.as_ref()
.and_then(|json| {
route_supports(json, "/management/capabilities", "GET")
})
.unwrap_or(false),
detail: root_json
.as_ref()
.and_then(|json| {
route_supports(json, "/management/capabilities", "GET")
})
.map(|ok| {
if ok {
"Route /management/capabilities (GET) is advertised".to_string()
} else {
"Route /management/capabilities (GET) not advertised"
.to_string()
}
})
.unwrap_or_else(|| {
"Mirror did not include routes metadata".to_string()
}),
},
gateway_fetch: ClusterCapabilityStatus {
available: root_json
.as_ref()
.and_then(|json| route_supports(json, "/gateway/fetch", "POST"))
.unwrap_or(false),
detail: root_json
.as_ref()
.and_then(|json| route_supports(json, "/gateway/fetch", "POST"))
.map(|ok| {
if ok {
"Route /gateway/fetch (POST) is advertised".to_string()
} else {
"Route /gateway/fetch (POST) not advertised".to_string()
}
})
.unwrap_or_else(|| {
"Mirror did not include routes metadata".to_string()
}),
},
..caps
})
.unwrap_or_else(|| {
let reason = if status == "offline" {
"mirror offline"
} else {
"capabilities not reported by mirror"
};
let mut caps: ClusterCapabilities = unavailable_capabilities(reason);
caps.openapi_download = ClusterCapabilityStatus {
available: openapi_ok,
detail: if openapi_ok {
"Downloaded /openapi.yaml successfully".to_string()
} else {
"Failed to download /openapi.yaml".to_string()
},
};
if let Some(json) = root_json.as_ref() {
let mgmt: bool = route_supports(json, "/management/capabilities", "GET")
.unwrap_or(false);
caps.management_api = ClusterCapabilityStatus {
available: mgmt,
detail: if mgmt {
"Route /management/capabilities (GET) is advertised".to_string()
} else {
"Route /management/capabilities (GET) not advertised".to_string()
},
};
let gw: bool =
route_supports(json, "/gateway/fetch", "POST").unwrap_or(false);
caps.gateway_fetch = ClusterCapabilityStatus {
available: gw,
detail: if gw {
"Route /gateway/fetch (POST) is advertised".to_string()
} else {
"Route /gateway/fetch (POST) not advertised".to_string()
},
};
}
caps
});
(
mirror.url,
ClusterMirrorHealth {
url: root_url,
status: status.to_string(),
latency_ms,
download_bytes_per_sec,
cargo_toml_version,
message,
capabilities: mirror_capabilities,
},
)
}
}))
.await;
let mirrors: Vec<ClusterMirrorHealth> = probes
.into_iter()
.map(|(url, probe)| {
app_state.metrics_state.set_cluster_probe(
&url,
crate::api::metrics::ClusterProbeMetric {
up: probe.status == "online",
latency_ms: probe.latency_ms,
download_bytes_per_sec: probe.download_bytes_per_sec,
},
);
probe
})
.collect::<Vec<_>>();
let payload: ClusterHealthPayload = ClusterHealthPayload {
message: "Athena is online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
athena_api: "online".to_string(),
athena_deadpool: athena_deadpool_status(app_state).to_string(),
athena_scylladb: "online".to_string(),
gateway_auth_store: gateway_auth_store_status(app_state).to_string(),
cargo_toml_version: env!("CARGO_PKG_VERSION").to_string(),
capabilities: local_capabilities(),
mirrors,
};
app_state
.cache
.insert(
CLUSTER_CACHE_KEY.to_string(),
json!(ClusterHealthCacheEntry {
cached_at_epoch_seconds: epoch_seconds(),
payload: payload.clone(),
}),
)
.await;
payload
}
#[get("/health/cluster")]
pub async fn cluster_health(app_state: web::Data<AppState>) -> HttpResponse {
HttpResponse::Ok().json(load_cluster_payload(app_state.get_ref()).await)
}
#[get("/ping")]
pub async fn ping() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.body("pong")
}