use std::collections::HashMap;
use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use futures::TryStreamExt;
use kanade_shared::kv::{
BUCKET_JOBS, BUCKET_SCHEDULES, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED,
};
use kanade_shared::manifest::{Manifest, Schedule};
use kanade_shared::subject;
use serde::Serialize;
use sqlx::{Row, SqlitePool};
use tracing::{info, warn};
use super::AppState;
use crate::audit;
use crate::audit::Caller;
pub async fn kill(
State(state): State<AppState>,
Path(job_id): Path<String>,
) -> Result<StatusCode, (StatusCode, String)> {
let rows = sqlx::query(
"SELECT exec_id FROM executions \
WHERE job_id = ? AND status IN ('pending', 'running')",
)
.bind(&job_id)
.fetch_all(&state.pool)
.await
.map_err(|e| {
warn!(error = %e, job_id, "kill: lookup running execs");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("lookup running execs: {e}"),
)
})?;
let exec_ids: Vec<String> = rows
.into_iter()
.map(|r| r.try_get::<String, _>("exec_id").unwrap_or_default())
.filter(|s| !s.is_empty())
.collect();
if exec_ids.is_empty() {
info!(
%job_id,
"kill: no running executions for this job (no-op)",
);
return Ok(StatusCode::NO_CONTENT);
}
for exec_id in &exec_ids {
if let Err(e) = state
.nats
.publish(subject::kill(exec_id), bytes::Bytes::new())
.await
{
warn!(error = %e, %job_id, %exec_id, "publish kill failed");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("publish kill.{exec_id}: {e}"),
));
}
}
let _ = state.nats.flush().await;
info!(
%job_id,
kill_count = exec_ids.len(),
"kill signal fanned out to running execs",
);
Ok(StatusCode::NO_CONTENT)
}
#[derive(Serialize)]
pub struct JobSummary {
pub id: String,
pub version: String,
pub description: Option<String>,
pub inventory: bool,
}
#[derive(Serialize, Default, Debug, Clone, PartialEq, Eq)]
pub struct JobLiveCounts {
pub running: i64,
pub pending: i64,
}
#[derive(Serialize)]
pub struct JobListRow {
#[serde(flatten)]
pub manifest: Manifest,
pub live: JobLiveCounts,
}
pub async fn list(
State(s): State<AppState>,
) -> Result<Json<Vec<JobListRow>>, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_JOBS).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys_stream = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let keys: Vec<String> = keys_stream
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let mut manifests = Vec::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = kv.get(&k).await
&& let Ok(job) = serde_json::from_slice::<Manifest>(&bytes)
{
manifests.push(job);
}
}
manifests.sort_by(|a, b| a.id.cmp(&b.id));
let live_counts = fetch_live_counts(&s.pool).await.unwrap_or_else(|e| {
warn!(error = %e, "jobs list: live count aggregation failed; returning zeros");
HashMap::new()
});
let out: Vec<JobListRow> = manifests
.into_iter()
.map(|m| {
let live = live_counts.get(&m.id).cloned().unwrap_or_default();
JobListRow { manifest: m, live }
})
.collect();
Ok(Json(out))
}
async fn fetch_live_counts(
pool: &SqlitePool,
) -> Result<HashMap<String, JobLiveCounts>, sqlx::Error> {
let rows = sqlx::query(
"SELECT job_id,
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) AS pending
FROM executions
WHERE status IN ('running', 'pending')
GROUP BY job_id",
)
.fetch_all(pool)
.await?;
let mut out: HashMap<String, JobLiveCounts> = HashMap::with_capacity(rows.len());
for r in rows {
let job_id: String = r.try_get("job_id").unwrap_or_default();
if job_id.is_empty() {
continue;
}
out.insert(
job_id,
JobLiveCounts {
running: r.try_get("running").unwrap_or(0),
pending: r.try_get("pending").unwrap_or(0),
},
);
}
Ok(out)
}
pub async fn create(
State(s): State<AppState>,
caller: Caller,
Json(job): Json<Manifest>,
) -> Result<Json<JobSummary>, (StatusCode, String)> {
let kv = s
.jetstream
.create_key_value(KvConfig {
bucket: BUCKET_JOBS.into(),
history: 5,
..Default::default()
})
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
let body = serde_json::to_vec(&job)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
kv.put(&job.id, body.into())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
let summary = JobSummary {
id: job.id.clone(),
version: job.version.clone(),
description: job.description.clone(),
inventory: job.inventory.is_some(),
};
info!(job_id = %job.id, version = %job.version, "job upserted");
audit::record(
&s.nats,
"operator",
"job_upsert",
Some(&job.id),
Some(&caller),
serde_json::json!({
"version": job.version,
"inventory": job.inventory.is_some(),
}),
)
.await;
Ok(Json(summary))
}
pub async fn delete(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
if let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES).await
&& let Ok(keys_stream) = kv.keys().await
{
let keys: Vec<String> = keys_stream.try_collect().await.unwrap_or_default();
for k in keys {
if let Ok(Some(bytes)) = kv.get(&k).await
&& let Ok(sched) = serde_json::from_slice::<Schedule>(&bytes)
&& sched.job_id == id
{
return Err((
StatusCode::CONFLICT,
format!(
"job '{id}' is referenced by schedule '{}'; remove the schedule first",
sched.id
),
));
}
}
}
let status_kv = s
.jetstream
.get_key_value(BUCKET_SCRIPT_STATUS)
.await
.map_err(|e| {
warn!(
error = %e,
bucket = BUCKET_SCRIPT_STATUS,
"job_delete cascade revoke: status KV unavailable",
);
(
StatusCode::SERVICE_UNAVAILABLE,
format!("script_status bucket missing: {e}"),
)
})?;
let kv = s.jetstream.get_key_value(BUCKET_JOBS).await.map_err(|e| {
warn!(error = %e, "jobs KV missing on delete");
(StatusCode::NOT_FOUND, "jobs bucket missing".to_string())
})?;
status_kv
.put(&id, bytes::Bytes::from(SCRIPT_STATUS_REVOKED))
.await
.map_err(|e| {
warn!(
error = %e,
job_id = %id,
bucket = BUCKET_SCRIPT_STATUS,
"job_delete cascade revoke: status KV put failed",
);
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("script_status put: {e}"),
)
})?;
if let Err(e) = kv.delete(&id).await {
warn!(
error = %e,
job_id = %id,
cascade_revoke = true,
"job_delete failed after cascade revoke",
);
audit::record(
&s.nats,
"operator",
"job_delete_failed_post_revoke",
Some(&id),
Some(&caller),
serde_json::json!({ "cascade_revoke": true, "error": e.to_string() }),
)
.await;
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"kv delete: {e}; script_status.{id} is already REVOKED — `kanade unrevoke {id}` to recover"
),
));
}
info!(job_id = %id, cascade_revoke = true, "job deleted");
audit::record(
&s.nats,
"operator",
"job_delete",
Some(&id),
Some(&caller),
serde_json::json!({ "cascade_revoke": true }),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn fetch(
js: &async_nats::jetstream::Context,
job_id: &str,
) -> anyhow::Result<Option<Manifest>> {
let kv = match js.get_key_value(BUCKET_JOBS).await {
Ok(k) => k,
Err(_) => return Ok(None),
};
let Some(bytes) = kv.get(job_id).await? else {
return Ok(None);
};
let job: Manifest = serde_json::from_slice(&bytes)?;
Ok(Some(job))
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::sqlite::SqlitePoolOptions;
async fn fresh_pool() -> SqlitePool {
let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect("sqlite::memory:")
.await
.expect("open sqlite memory");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("run migrations");
pool
}
async fn insert_exec(
pool: &SqlitePool,
exec_id: &str,
job_id: &str,
status: &str,
target: i64,
) {
sqlx::query(
"INSERT INTO executions
(exec_id, job_id, version, initiated_by, target_count, status)
VALUES (?, ?, '1.0.0', 'tester', ?, ?)",
)
.bind(exec_id)
.bind(job_id)
.bind(target)
.bind(status)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn fetch_live_counts_groups_by_job_id() {
let pool = fresh_pool().await;
insert_exec(&pool, "e1", "inv-hw", "running", 10).await;
insert_exec(&pool, "e2", "inv-hw", "running", 10).await;
insert_exec(&pool, "e3", "inv-hw", "pending", 10).await;
insert_exec(&pool, "e4", "patch-x", "running", 5).await;
let counts = fetch_live_counts(&pool).await.unwrap();
assert_eq!(
counts.get("inv-hw"),
Some(&JobLiveCounts {
running: 2,
pending: 1,
}),
);
assert_eq!(
counts.get("patch-x"),
Some(&JobLiveCounts {
running: 1,
pending: 0,
}),
);
}
#[tokio::test]
async fn fetch_live_counts_excludes_completed() {
let pool = fresh_pool().await;
insert_exec(&pool, "e1", "j", "completed", 5).await;
insert_exec(&pool, "e2", "j", "running", 5).await;
let counts = fetch_live_counts(&pool).await.unwrap();
let live = counts.get("j").expect("j has at least one exec");
assert_eq!(live.running, 1);
assert_eq!(live.pending, 0);
}
#[tokio::test]
async fn fetch_live_counts_empty_when_no_executions() {
let pool = fresh_pool().await;
let counts = fetch_live_counts(&pool).await.unwrap();
assert!(counts.is_empty());
}
}