use actix_web::{HttpResponse, get, web};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::time::Instant;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::AppState;
use crate::api::client_context::logging_pool;
use crate::data::cluster::{fallback_cluster_mirrors, list_active_cluster_mirrors};
const CLUSTER_CACHE_KEY: &str = "health:cluster";
#[derive(Debug, Clone, Serialize)]
struct RouteDefinition {
path: &'static str,
methods: &'static [&'static str],
summary: &'static str,
}
const ROUTES: &[RouteDefinition] = &[
RouteDefinition {
path: "/",
methods: &["GET"],
summary: "API root and route listing",
},
RouteDefinition {
path: "/ping",
methods: &["GET"],
summary: "Health check",
},
RouteDefinition {
path: "/health/cluster",
methods: &["GET"],
summary: "Cluster mirror health and version checks",
},
RouteDefinition {
path: "/clients",
methods: &["GET"],
summary: "List Athena clients (protected)",
},
RouteDefinition {
path: "/schema/clients",
methods: &["GET"],
summary: "List Postgres clients",
},
RouteDefinition {
path: "/schema/tables",
methods: &["GET"],
summary: "List tables",
},
RouteDefinition {
path: "/schema/columns",
methods: &["GET"],
summary: "List columns",
},
RouteDefinition {
path: "/schema/migrations",
methods: &["GET"],
summary: "List schema migrations (graceful fallback when table absent)",
},
RouteDefinition {
path: "/router/registry",
methods: &["GET"],
summary: "Athena router registry",
},
RouteDefinition {
path: "/registry",
methods: &["GET"],
summary: "API registry",
},
RouteDefinition {
path: "/metrics",
methods: &["GET"],
summary: "Prometheus metrics endpoint",
},
RouteDefinition {
path: "/gateway/fetch",
methods: &["POST"],
summary: "Fetch data",
},
RouteDefinition {
path: "/gateway/update",
methods: &["POST"],
summary: "Update data",
},
RouteDefinition {
path: "/gateway/insert",
methods: &["PUT"],
summary: "Insert data",
},
RouteDefinition {
path: "/gateway/delete",
methods: &["DELETE"],
summary: "Delete data",
},
RouteDefinition {
path: "/gateway/query",
methods: &["POST"],
summary: "Execute SQL",
},
RouteDefinition {
path: "/query/sql",
methods: &["POST"],
summary: "Execute SQL via driver",
},
RouteDefinition {
path: "/pipelines",
methods: &["POST"],
summary: "Run pipeline",
},
RouteDefinition {
path: "/management/capabilities",
methods: &["GET"],
summary: "List management API capabilities for a client",
},
RouteDefinition {
path: "/management/tables",
methods: &["POST"],
summary: "Create a managed table",
},
RouteDefinition {
path: "/management/tables/{table_name}",
methods: &["PATCH", "DELETE"],
summary: "Edit or drop a managed table",
},
RouteDefinition {
path: "/management/tables/{table_name}/columns/{column_name}",
methods: &["DELETE"],
summary: "Drop a managed table column",
},
RouteDefinition {
path: "/management/indexes",
methods: &["POST"],
summary: "Create an index",
},
RouteDefinition {
path: "/management/indexes/{index_name}",
methods: &["DELETE"],
summary: "Drop an index",
},
RouteDefinition {
path: "/admin/api-keys",
methods: &["GET", "POST"],
summary: "Manage API keys",
},
RouteDefinition {
path: "/admin/api-keys/{id}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an API key",
},
RouteDefinition {
path: "/admin/api-key-rights",
methods: &["GET", "POST"],
summary: "Manage API key rights",
},
RouteDefinition {
path: "/admin/api-key-rights/{id}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an API key right",
},
RouteDefinition {
path: "/admin/api-key-config",
methods: &["GET", "PUT"],
summary: "Manage global API key enforcement",
},
RouteDefinition {
path: "/admin/api-key-clients",
methods: &["GET"],
summary: "List per-client API key enforcement",
},
RouteDefinition {
path: "/admin/api-key-clients/{client_name}",
methods: &["PUT", "DELETE"],
summary: "Manage per-client API key enforcement",
},
RouteDefinition {
path: "/admin/clients",
methods: &["GET", "POST"],
summary: "Manage Athena client catalog",
},
RouteDefinition {
path: "/admin/clients/{client_name}",
methods: &["PATCH", "DELETE"],
summary: "Update or delete an Athena client",
},
RouteDefinition {
path: "/admin/clients/{client_name}/freeze",
methods: &["PUT"],
summary: "Freeze or unfreeze an Athena client",
},
RouteDefinition {
path: "/admin/clients/statistics",
methods: &["GET"],
summary: "List Athena client statistics",
},
RouteDefinition {
path: "/admin/clients/statistics/refresh",
methods: &["POST"],
summary: "Rebuild Athena client statistics from logs",
},
RouteDefinition {
path: "/admin/clients/{client_name}/statistics",
methods: &["GET"],
summary: "Inspect Athena client statistics",
},
RouteDefinition {
path: "/admin/provision",
methods: &["POST"],
summary: "Provision a database with the Athena schema",
},
RouteDefinition {
path: "/admin/provision/status",
methods: &["GET"],
summary: "Check database provisioning status",
},
RouteDefinition {
path: "/admin/backups",
methods: &["GET", "POST"],
summary: "List or create database backups",
},
RouteDefinition {
path: "/admin/backups/{key}/restore",
methods: &["POST"],
summary: "Restore a database from an S3 backup",
},
RouteDefinition {
path: "/admin/backups/{key}/download",
methods: &["GET"],
summary: "Download a backup archive from S3",
},
RouteDefinition {
path: "/admin/backups/{key}",
methods: &["DELETE"],
summary: "Delete a backup from S3",
},
RouteDefinition {
path: "/openapi.yaml",
methods: &["GET"],
summary: "OpenAPI spec",
},
RouteDefinition {
path: "/openapi-wss.yaml",
methods: &["GET"],
summary: "WebSocket OpenAPI spec",
},
RouteDefinition {
path: "/wss/info",
methods: &["GET"],
summary: "WebSocket gateway contract",
},
RouteDefinition {
path: "/docs",
methods: &["GET"],
summary: "Documentation redirect",
},
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMirrorHealth {
pub url: String,
pub status: String,
pub latency_ms: Option<f64>,
pub download_bytes_per_sec: Option<f64>,
pub cargo_toml_version: Option<String>,
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterHealthPayload {
pub message: String,
pub version: String,
pub athena_api: String,
pub athena_deadpool: String,
pub athena_scylladb: String,
pub cargo_toml_version: String,
pub mirrors: Vec<ClusterMirrorHealth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ClusterHealthCacheEntry {
cached_at_epoch_seconds: i64,
payload: ClusterHealthPayload,
}
fn epoch_seconds() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
fn athena_deadpool_status(app_state: &AppState) -> &'static str {
if app_state.pg_registry.list_clients().is_empty() {
"offline"
} else {
"online"
}
}
#[get("/")]
pub async fn root(app_state: web::Data<AppState>) -> HttpResponse {
let body = json!({
"message": "athena is online",
"version": env!("CARGO_PKG_VERSION"),
"athena_api": "online",
"athena_deadpool": athena_deadpool_status(app_state.get_ref()),
"athena_scylladb": "online",
"cargo_toml_version": env!("CARGO_PKG_VERSION"),
"routes": ROUTES,
});
HttpResponse::Ok()
.content_type("application/json")
.json(body)
}
async fn load_cluster_payload(app_state: &AppState) -> ClusterHealthPayload {
if let Some(cached) = app_state.cache.get(CLUSTER_CACHE_KEY).await
&& let Ok(entry) = serde_json::from_value::<ClusterHealthCacheEntry>(cached)
&& epoch_seconds() - entry.cached_at_epoch_seconds <= 15
{
return entry.payload;
}
let mirrors = match logging_pool(app_state) {
Ok(pool) => list_active_cluster_mirrors(&pool)
.await
.ok()
.filter(|rows| !rows.is_empty())
.unwrap_or_else(fallback_cluster_mirrors),
Err(_) => fallback_cluster_mirrors(),
};
let client = app_state.client.clone();
let probes = join_all(mirrors.into_iter().map(|mirror| {
let client = client.clone();
async move {
let root_url = mirror.url.clone();
let ping_url = format!("{}/ping", mirror.url.trim_end_matches('/'));
let openapi_url = format!("{}/openapi.yaml", mirror.url.trim_end_matches('/'));
let ping_started = Instant::now();
let ping_response = client
.get(&ping_url)
.timeout(std::time::Duration::from_secs(3))
.send()
.await;
let latency_ms = ping_response
.as_ref()
.ok()
.map(|_| ping_started.elapsed().as_secs_f64() * 1000.0);
let root_response = client
.get(&root_url)
.timeout(std::time::Duration::from_secs(4))
.send()
.await;
let root_json: Option<Value> = match root_response {
Ok(response) => response.json::<Value>().await.ok(),
Err(_) => None,
};
let openapi_started = Instant::now();
let download_response = client
.get(&openapi_url)
.timeout(std::time::Duration::from_secs(5))
.send()
.await;
let download_bytes_per_sec = match download_response {
Ok(response) => match response.bytes().await {
Ok(bytes) => {
let elapsed = openapi_started.elapsed().as_secs_f64().max(0.001);
Some(bytes.len() as f64 / elapsed)
}
Err(_) => None,
},
Err(_) => None,
};
let status = if root_json.is_some() && latency_ms.is_some() {
"online"
} else {
"offline"
};
let cargo_toml_version = root_json
.as_ref()
.and_then(|json| {
json.get("cargo_toml_version")
.or_else(|| json.get("version"))
})
.and_then(Value::as_str)
.map(str::to_string);
let message = root_json
.as_ref()
.and_then(|json| json.get("message"))
.and_then(Value::as_str)
.map(str::to_string);
(
mirror.url,
ClusterMirrorHealth {
url: root_url,
status: status.to_string(),
latency_ms,
download_bytes_per_sec,
cargo_toml_version,
message,
},
)
}
}))
.await;
let mirrors = probes
.into_iter()
.map(|(url, probe)| {
app_state.metrics_state.set_cluster_probe(
&url,
crate::api::metrics::ClusterProbeMetric {
up: probe.status == "online",
latency_ms: probe.latency_ms,
download_bytes_per_sec: probe.download_bytes_per_sec,
},
);
probe
})
.collect::<Vec<_>>();
let payload = ClusterHealthPayload {
message: "athena is online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
athena_api: "online".to_string(),
athena_deadpool: athena_deadpool_status(app_state).to_string(),
athena_scylladb: "online".to_string(),
cargo_toml_version: env!("CARGO_PKG_VERSION").to_string(),
mirrors,
};
app_state
.cache
.insert(
CLUSTER_CACHE_KEY.to_string(),
json!(ClusterHealthCacheEntry {
cached_at_epoch_seconds: epoch_seconds(),
payload: payload.clone(),
}),
)
.await;
payload
}
#[get("/health/cluster")]
pub async fn cluster_health(app_state: web::Data<AppState>) -> HttpResponse {
HttpResponse::Ok().json(load_cluster_payload(app_state.get_ref()).await)
}
#[get("/ping")]
pub async fn ping() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.body("pong")
}