use axum::Json;
use axum::extract::State;
use axum::http::StatusCode;
use chrono::{DateTime, Duration, Utc};
use kanade_shared::kv::{
BUCKET_AGENT_CONFIG, BUCKET_AGENT_GROUPS, BUCKET_AGENTS_STATE, BUCKET_SCHEDULES,
BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS, OBJECT_AGENT_RELEASES, STREAM_AUDIT,
STREAM_EVENTS, STREAM_EXEC, STREAM_INVENTORY, STREAM_RESULTS,
};
use serde::Serialize;
use sqlx::Row;
use tracing::warn;
use super::AppState;
const STALE_THRESHOLD: Duration = Duration::minutes(2);
const RECENT_WINDOW: Duration = Duration::hours(24);
#[derive(Serialize)]
pub struct FleetHealth {
pub status: HealthStatus,
pub agents: AgentsHealth,
pub jetstream: JetstreamHealth,
pub recent_results: RecentResults,
pub observed_at: DateTime<Utc>,
}
#[derive(Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
Ok,
Unknown,
Degraded,
}
#[derive(Serialize)]
pub struct AgentsHealth {
pub known: i64,
pub active: i64,
pub stale: i64,
}
#[derive(Serialize)]
pub struct JetstreamHealth {
pub all_ok: bool,
pub healthy: usize,
pub total: usize,
pub missing: Vec<String>,
}
#[derive(Serialize)]
pub struct RecentResults {
pub window_hours: i64,
pub total: i64,
pub failed: i64,
}
pub async fn fleet(State(state): State<AppState>) -> (StatusCode, Json<FleetHealth>) {
let now = Utc::now();
let stale_cutoff = now - STALE_THRESHOLD;
let recent_cutoff = now - RECENT_WINDOW;
let agents = agents_health(&state.pool, stale_cutoff).await;
let jetstream = jetstream_health(&state.jetstream).await;
let recent_results = recent_results(&state.pool, recent_cutoff).await;
let status = if jetstream.all_ok && agents.stale == 0 && agents.known > 0 {
HealthStatus::Ok
} else if !jetstream.all_ok || agents.stale > 0 {
HealthStatus::Degraded
} else {
HealthStatus::Unknown
};
let code = match status {
HealthStatus::Ok | HealthStatus::Unknown => StatusCode::OK,
HealthStatus::Degraded => StatusCode::SERVICE_UNAVAILABLE,
};
(
code,
Json(FleetHealth {
status,
agents,
jetstream,
recent_results,
observed_at: now,
}),
)
}
async fn agents_health(pool: &sqlx::SqlitePool, stale_cutoff: DateTime<Utc>) -> AgentsHealth {
let row = sqlx::query(
"SELECT
COUNT(*) AS known,
COALESCE(SUM(CASE WHEN last_heartbeat >= ? THEN 1 ELSE 0 END), 0) AS active
FROM agents",
)
.bind(stale_cutoff)
.fetch_one(pool)
.await;
let (known, active) = match row {
Ok(r) => (
r.try_get::<i64, _>("known").unwrap_or(0),
r.try_get::<i64, _>("active").unwrap_or(0),
),
Err(e) => {
warn!(error = %e, "agents_health query");
(0, 0)
}
};
AgentsHealth {
known,
active,
stale: (known - active).max(0),
}
}
async fn jetstream_health(js: &async_nats::jetstream::Context) -> JetstreamHealth {
let mut missing = Vec::new();
let mut total = 0usize;
for name in [
STREAM_INVENTORY,
STREAM_RESULTS,
STREAM_EXEC,
STREAM_EVENTS,
STREAM_AUDIT,
] {
total += 1;
if js.get_stream(name).await.is_err() {
missing.push(name.to_string());
}
}
for name in [
BUCKET_SCRIPT_CURRENT,
BUCKET_SCRIPT_STATUS,
BUCKET_AGENTS_STATE,
BUCKET_AGENT_CONFIG,
BUCKET_AGENT_GROUPS,
BUCKET_SCHEDULES,
] {
total += 1;
if js.get_key_value(name).await.is_err() {
missing.push(name.to_string());
}
}
for name in [OBJECT_AGENT_RELEASES] {
total += 1;
if js.get_object_store(name).await.is_err() {
missing.push(name.to_string());
}
}
JetstreamHealth {
all_ok: missing.is_empty(),
healthy: total - missing.len(),
total,
missing,
}
}
async fn recent_results(pool: &sqlx::SqlitePool, since: DateTime<Utc>) -> RecentResults {
let row = sqlx::query(
"SELECT
COUNT(*) AS total,
COALESCE(SUM(CASE WHEN exit_code <> 0 THEN 1 ELSE 0 END), 0) AS failed
FROM execution_results
WHERE recorded_at >= ?",
)
.bind(since)
.fetch_one(pool)
.await;
let (total, failed) = match row {
Ok(r) => (
r.try_get::<i64, _>("total").unwrap_or(0),
r.try_get::<i64, _>("failed").unwrap_or(0),
),
Err(e) => {
warn!(error = %e, "recent_results query");
(0, 0)
}
};
RecentResults {
window_hours: RECENT_WINDOW.num_hours(),
total,
failed,
}
}
#[derive(Serialize)]
pub struct ScanDurationStats {
pub job_id: String,
pub count: i64,
pub min_ms: i64,
pub p50_ms: i64,
pub p95_ms: i64,
pub p99_ms: i64,
pub max_ms: i64,
pub mean_ms: i64,
}
#[derive(serde::Deserialize)]
pub struct ScanDurationParams {
pub since: Option<DateTime<Utc>>,
}
pub async fn scan_durations(
State(state): State<AppState>,
axum::extract::Query(params): axum::extract::Query<ScanDurationParams>,
) -> Result<Json<Vec<ScanDurationStats>>, (StatusCode, String)> {
let since = params
.since
.unwrap_or_else(|| Utc::now() - Duration::hours(24));
let rows = sqlx::query(
"SELECT job_id, \
CAST((julianday(finished_at) - julianday(started_at)) * 86400000.0 AS INTEGER) AS dur_ms \
FROM execution_results \
WHERE finished_at IS NOT NULL \
AND started_at IS NOT NULL \
AND job_id IS NOT NULL \
AND finished_at >= ?",
)
.bind(since)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, "scan_durations query");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
})?;
let mut by_job: std::collections::HashMap<String, Vec<i64>> = std::collections::HashMap::new();
for r in &rows {
let Ok(job_id) = r.try_get::<String, _>("job_id") else {
continue;
};
let dur = r.try_get::<i64, _>("dur_ms").unwrap_or(0).max(0);
by_job.entry(job_id).or_default().push(dur);
}
let mut out: Vec<ScanDurationStats> = by_job
.into_iter()
.map(|(job_id, mut durs)| {
durs.sort_unstable();
let count = durs.len() as i64;
let sum: i64 = durs.iter().sum();
let mean_ms = if count > 0 { sum / count } else { 0 };
ScanDurationStats {
job_id,
count,
min_ms: *durs.first().unwrap_or(&0),
p50_ms: percentile(&durs, 0.50),
p95_ms: percentile(&durs, 0.95),
p99_ms: percentile(&durs, 0.99),
max_ms: *durs.last().unwrap_or(&0),
mean_ms,
}
})
.collect();
out.sort_by_key(|s| std::cmp::Reverse(s.p95_ms));
Ok(Json(out))
}
fn percentile(sorted: &[i64], q: f64) -> i64 {
if sorted.is_empty() {
return 0;
}
let q = q.clamp(0.0, 1.0);
let rank = (q * sorted.len() as f64).ceil() as usize;
let idx = rank.saturating_sub(1).min(sorted.len() - 1);
sorted[idx]
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn percentile_empty_is_zero() {
assert_eq!(percentile(&[], 0.5), 0);
assert_eq!(percentile(&[], 0.95), 0);
}
#[test]
fn percentile_single_element() {
assert_eq!(percentile(&[42], 0.0), 42);
assert_eq!(percentile(&[42], 0.5), 42);
assert_eq!(percentile(&[42], 1.0), 42);
}
#[test]
fn percentile_nearest_rank_on_ten_elements() {
let v: Vec<i64> = (1..=10).collect(); assert_eq!(percentile(&v, 0.50), 5);
assert_eq!(percentile(&v, 0.90), 9);
assert_eq!(percentile(&v, 0.95), 10);
assert_eq!(percentile(&v, 0.99), 10);
}
#[test]
fn percentile_q_clamps_to_unit_interval() {
let v: Vec<i64> = (1..=10).collect();
assert_eq!(percentile(&v, -0.5), 1);
assert_eq!(percentile(&v, 1.5), 10);
}
}