use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::http::header::HeaderMap;
use futures::TryStreamExt;
use kanade_shared::kv::{
BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED,
};
use kanade_shared::manifest::{RunsOn, Schedule};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use tracing::{info, warn};
use crate::api::AppState;
use crate::api::yaml_body::{YamlOrJson, mirror_yaml, yaml_headers};
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize)]
pub struct ScheduleSummary {
pub id: String,
pub when: String,
pub enabled: bool,
pub job_id: String,
}
pub async fn list(State(s): State<AppState>) -> Result<Json<Vec<Schedule>>, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys_stream = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let keys: Vec<String> = keys_stream
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let mut out = Vec::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = kv.get(&k).await
&& let Ok(sched) = serde_json::from_slice::<Schedule>(&bytes)
{
out.push(sched);
}
}
Ok(Json(out))
}
#[derive(Deserialize, Debug)]
pub struct PreviewQuery {
#[serde(default = "default_preview_count")]
pub count: usize,
}
fn default_preview_count() -> usize {
5
}
#[derive(Serialize)]
pub struct PreviewResponse {
pub id: String,
pub when: String,
pub tz: String,
pub enabled: bool,
pub fires: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub note: Option<String>,
}
pub async fn preview(
State(s): State<AppState>,
Path(id): Path<String>,
Query(q): Query<PreviewQuery>,
) -> Result<Json<PreviewResponse>, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
let count = q.count.clamp(1, 50);
let fires = schedule.preview_fires(chrono::Utc::now(), count);
let note = if !fires.is_empty() {
None
} else if matches!(schedule.when, kanade_shared::manifest::When::Calendar(_)) {
Some(
"no upcoming fires — a past one-shot, or the fire time is excluded by the \
active window / constraints.window"
.to_string(),
)
} else {
Some(format!(
"reconcile cadence ({}) polls every minute gated by cooldown — no discrete \
fire times to preview",
schedule.when
))
};
Ok(Json(PreviewResponse {
id: schedule.id.clone(),
when: schedule.when.to_string(),
tz: schedule.tz.as_str().to_string(),
enabled: schedule.enabled,
fires: fires.iter().map(|t| t.to_rfc3339()).collect(),
note,
}))
}
const STATUS_WINDOW_HOURS: i64 = 24;
#[derive(Serialize, Default)]
pub struct LastRun {
pub pc_id: String,
pub exit_code: Option<i64>,
pub started_at: Option<String>,
pub finished_at: Option<String>,
}
#[derive(Serialize, Default)]
pub struct RecentCounts {
pub window_hours: i64,
pub ok: i64,
pub fail: i64,
}
#[derive(Serialize)]
pub struct StatusResponse {
pub id: String,
pub when: String,
pub tz: String,
pub enabled: bool,
pub next_run: Option<String>,
pub last_run: Option<LastRun>,
pub recent: RecentCounts,
}
async fn schedule_run_stats(
pool: &sqlx::SqlitePool,
job_id: &str,
since: chrono::DateTime<chrono::Utc>,
) -> Result<(Option<LastRun>, RecentCounts), sqlx::Error> {
use sqlx::Row;
let last = sqlx::query(
"SELECT pc_id, exit_code, started_at, finished_at
FROM execution_results
WHERE job_id = ?
ORDER BY recorded_at DESC
LIMIT 1",
)
.bind(job_id)
.fetch_optional(pool)
.await?
.map(|r| LastRun {
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get::<Option<i64>, _>("exit_code").unwrap_or(None),
started_at: r.try_get("started_at").ok(),
finished_at: r
.try_get::<Option<String>, _>("finished_at")
.unwrap_or(None),
});
let counts = sqlx::query(
"SELECT
COALESCE(SUM(CASE WHEN exit_code = 0 THEN 1 ELSE 0 END), 0) AS ok,
COALESCE(SUM(CASE WHEN exit_code IS NOT NULL AND exit_code <> 0 THEN 1 ELSE 0 END), 0) AS fail
FROM execution_results
WHERE job_id = ? AND finished_at IS NOT NULL AND recorded_at >= ?",
)
.bind(job_id)
.bind(since)
.fetch_one(pool)
.await?;
let recent = RecentCounts {
window_hours: STATUS_WINDOW_HOURS,
ok: counts.try_get("ok").unwrap_or(0),
fail: counts.try_get("fail").unwrap_or(0),
};
Ok((last, recent))
}
pub async fn status(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<StatusResponse>, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
let now = chrono::Utc::now();
let next_run = schedule
.preview_fires(now, 1)
.first()
.map(chrono::DateTime::to_rfc3339);
let since = now - chrono::Duration::hours(STATUS_WINDOW_HOURS);
let (last_run, recent) = schedule_run_stats(&s.pool, &schedule.job_id, since)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("run stats: {e}")))?;
Ok(Json(StatusResponse {
id: schedule.id.clone(),
when: schedule.when.to_string(),
tz: schedule.tz.as_str().to_string(),
enabled: schedule.enabled,
next_run,
last_run,
recent,
}))
}
#[derive(Serialize)]
pub struct AgentRun {
pub pc_id: String,
pub state: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub finished_at: Option<String>,
}
#[derive(Serialize, Default)]
pub struct CoverageResponse {
pub id: String,
pub when: String,
pub job_id: String,
pub runs_on: String,
pub total: usize,
pub ok: usize,
pub fail: usize,
pub running: usize,
pub pending: usize,
pub agents: Vec<AgentRun>,
}
#[derive(Serialize)]
pub struct CoverageSummary {
pub id: String,
pub total: usize,
pub ok: usize,
pub fail: usize,
pub running: usize,
pub pending: usize,
}
fn runs_on_str(r: RunsOn) -> &'static str {
match r {
RunsOn::Backend => "backend",
RunsOn::Agent => "agent",
}
}
type FinishedRun = (Option<i64>, Option<String>, Option<String>);
type FinishedMap = HashMap<String, FinishedRun>;
fn coverage_for(
roster: &[String],
inflight: &HashSet<String>,
finished: &FinishedMap,
) -> (Vec<AgentRun>, usize, usize, usize, usize) {
let mut agents = Vec::with_capacity(roster.len());
let (mut ok, mut fail, mut running, mut pending) = (0usize, 0usize, 0usize, 0usize);
for pc in roster {
if inflight.contains(pc) {
running += 1;
agents.push(AgentRun {
pc_id: pc.clone(),
state: "running",
version: None,
finished_at: None,
});
} else if let Some((exit, version, finished_at)) = finished.get(pc) {
let state = match exit {
Some(0) => {
ok += 1;
"ok"
}
_ => {
fail += 1;
"fail"
}
};
agents.push(AgentRun {
pc_id: pc.clone(),
state,
version: version.clone(),
finished_at: finished_at.clone(),
});
} else {
pending += 1;
agents.push(AgentRun {
pc_id: pc.clone(),
state: "pending",
version: None,
finished_at: None,
});
}
}
(agents, ok, fail, running, pending)
}
async fn coverage_rows(
pool: &sqlx::SqlitePool,
job_id: &str,
) -> Result<(HashSet<String>, FinishedMap), sqlx::Error> {
use sqlx::Row;
let inflight: HashSet<String> = sqlx::query(
"SELECT DISTINCT pc_id FROM execution_results
WHERE job_id = ? AND finished_at IS NULL",
)
.bind(job_id)
.fetch_all(pool)
.await?
.into_iter()
.filter_map(|r| r.try_get::<String, _>("pc_id").ok())
.collect();
let mut finished = HashMap::new();
let rows = sqlx::query(
"SELECT pc_id, exit_code, finished_at, version FROM (
SELECT pc_id, exit_code, finished_at, version,
ROW_NUMBER() OVER (
PARTITION BY pc_id
ORDER BY finished_at DESC, result_id DESC
) AS rn
FROM execution_results
WHERE job_id = ? AND finished_at IS NOT NULL
) WHERE rn = 1",
)
.bind(job_id)
.fetch_all(pool)
.await?;
for r in rows {
let pc: String = r.try_get("pc_id").unwrap_or_default();
if pc.is_empty() {
continue;
}
let exit = r.try_get::<Option<i64>, _>("exit_code").unwrap_or(None);
let version = r.try_get::<Option<String>, _>("version").unwrap_or(None);
let finished_at = r
.try_get::<Option<String>, _>("finished_at")
.unwrap_or(None);
finished.insert(pc, (exit, version, finished_at));
}
Ok((inflight, finished))
}
pub async fn coverage(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<CoverageResponse>, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
let roster = crate::scheduler::resolve_roster(&s, &schedule.plan.target, false)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("roster: {e}")))?;
let (inflight, finished) = coverage_rows(&s.pool, &schedule.job_id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("coverage: {e}")))?;
let (agents, ok, fail, running, pending) = coverage_for(&roster, &inflight, &finished);
Ok(Json(CoverageResponse {
id: schedule.id.clone(),
when: schedule.when.to_string(),
job_id: schedule.job_id.clone(),
runs_on: runs_on_str(schedule.runs_on).to_string(),
total: roster.len(),
ok,
fail,
running,
pending,
agents,
}))
}
pub async fn coverage_summary(
State(s): State<AppState>,
) -> Result<Json<Vec<CoverageSummary>>, (StatusCode, String)> {
use futures::StreamExt;
use sqlx::Row;
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys: Vec<String> = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let schedules: Vec<Schedule> = futures::stream::iter(keys)
.map(|k| {
let kv = kv.clone();
async move {
kv.get(&k)
.await
.ok()
.flatten()
.and_then(|bytes| serde_json::from_slice::<Schedule>(&bytes).ok())
}
})
.buffer_unordered(16)
.filter_map(|s| async move { s })
.collect()
.await;
let job_ids: Vec<String> = schedules
.iter()
.map(|s| s.job_id.clone())
.collect::<HashSet<_>>()
.into_iter()
.collect();
let mut inflight: HashMap<String, HashSet<String>> = HashMap::new();
let mut finished: HashMap<String, FinishedMap> = HashMap::new();
if !job_ids.is_empty() {
let placeholders = vec!["?"; job_ids.len()].join(",");
let inflight_sql = format!(
"SELECT DISTINCT job_id, pc_id FROM execution_results
WHERE job_id IN ({placeholders}) AND finished_at IS NULL"
);
let mut q = sqlx::query(sqlx::AssertSqlSafe(inflight_sql));
for jid in &job_ids {
q = q.bind(jid);
}
for r in q
.fetch_all(&s.pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("inflight: {e}")))?
{
let (Ok(jid), Ok(pc)) = (
r.try_get::<String, _>("job_id"),
r.try_get::<String, _>("pc_id"),
) else {
continue;
};
inflight.entry(jid).or_default().insert(pc);
}
let finished_sql = format!(
"SELECT job_id, pc_id, exit_code, finished_at, version FROM (
SELECT job_id, pc_id, exit_code, finished_at, version,
ROW_NUMBER() OVER (
PARTITION BY job_id, pc_id
ORDER BY finished_at DESC, result_id DESC
) AS rn
FROM execution_results
WHERE job_id IN ({placeholders}) AND finished_at IS NOT NULL
) WHERE rn = 1"
);
let mut q = sqlx::query(sqlx::AssertSqlSafe(finished_sql));
for jid in &job_ids {
q = q.bind(jid);
}
for r in q
.fetch_all(&s.pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("finished: {e}")))?
{
let (Ok(jid), Ok(pc)) = (
r.try_get::<String, _>("job_id"),
r.try_get::<String, _>("pc_id"),
) else {
continue;
};
if pc.is_empty() {
continue;
}
let exit = r.try_get::<Option<i64>, _>("exit_code").unwrap_or(None);
let version = r.try_get::<Option<String>, _>("version").unwrap_or(None);
let finished_at = r
.try_get::<Option<String>, _>("finished_at")
.unwrap_or(None);
finished
.entry(jid)
.or_default()
.insert(pc, (exit, version, finished_at));
}
}
let mut distinct: HashMap<String, kanade_shared::manifest::Target> = HashMap::new();
for sched in &schedules {
let key = serde_json::to_string(&sched.plan.target).unwrap_or_default();
distinct
.entry(key)
.or_insert_with(|| sched.plan.target.clone());
}
let rosters: HashMap<String, Vec<String>> = futures::stream::iter(distinct)
.map(|(key, target)| {
let s = s.clone();
async move {
let roster = crate::scheduler::resolve_roster(&s, &target, false).await;
(key, roster)
}
})
.buffer_unordered(8)
.filter_map(|(k, r)| async move { r.ok().map(|v| (k, v)) })
.collect()
.await;
let empty_set: HashSet<String> = HashSet::new();
let empty_map: FinishedMap = HashMap::new();
let out: Vec<CoverageSummary> = schedules
.iter()
.map(|sched| {
let key = serde_json::to_string(&sched.plan.target).unwrap_or_default();
let roster = rosters.get(&key).cloned().unwrap_or_default();
let inf = inflight.get(&sched.job_id).unwrap_or(&empty_set);
let fin = finished.get(&sched.job_id).unwrap_or(&empty_map);
let (_, ok, fail, running, pending) = coverage_for(&roster, inf, fin);
CoverageSummary {
id: sched.id.clone(),
total: roster.len(),
ok,
fail,
running,
pending,
}
})
.collect();
Ok(Json(out))
}
pub async fn create(
State(s): State<AppState>,
caller: Caller,
body: YamlOrJson<Schedule>,
) -> Result<Json<ScheduleSummary>, (StatusCode, String)> {
let YamlOrJson {
value: schedule,
raw_yaml,
} = body;
if let Err(e) = schedule.validate() {
return Err((StatusCode::BAD_REQUEST, format!("invalid schedule: {e}")));
}
match crate::api::jobs::fetch(&s.jetstream, &schedule.job_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return Err((
StatusCode::BAD_REQUEST,
format!(
"unknown job_id '{}' — register the job first (kanade job create)",
schedule.job_id
),
));
}
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("job catalog lookup: {e}"),
));
}
}
let kv = s
.jetstream
.create_key_value(KvConfig {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
let body_bytes = serde_json::to_vec(&schedule)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
kv.put(&schedule.id, body_bytes.into())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
let yaml_source = raw_yaml.unwrap_or_else(|| {
serde_yaml::to_string(&schedule)
.unwrap_or_else(|_| String::from("# YAML mirror unavailable for this entry"))
});
if let Err(e) = mirror_yaml(&s, BUCKET_SCHEDULES_YAML, &schedule.id, &yaml_source).await {
warn!(
error = %e,
schedule_id = %schedule.id,
"schedules: YAML mirror put failed; JSON catalog is current",
);
}
info!(
schedule_id = %schedule.id,
when = %schedule.when,
job_id = %schedule.job_id,
"schedule upserted",
);
audit::record(
&s.nats,
"operator",
"schedule_upsert",
Some(&schedule.id),
Some(&caller),
serde_json::json!({
"when": schedule.when.to_string(),
"job_id": schedule.job_id,
"enabled": schedule.enabled,
}),
)
.await;
Ok(Json(ScheduleSummary {
id: schedule.id.clone(),
when: schedule.when.to_string(),
enabled: schedule.enabled,
job_id: schedule.job_id.clone(),
}))
}
pub async fn get_yaml(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, HeaderMap, String), (StatusCode, String)> {
if let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await
&& let Ok(Some(bytes)) = kv.get(&id).await
&& let Ok(text) = String::from_utf8(bytes.to_vec())
{
return Ok((StatusCode::OK, yaml_headers(), text));
}
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|_| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("decode: {e}")))?;
let yaml = serde_yaml::to_string(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode YAML: {e}"),
)
})?;
Ok((StatusCode::OK, yaml_headers(), yaml))
}
fn patch_yaml_enabled(yaml: &str, enabled: bool) -> String {
let mut out = String::with_capacity(yaml.len() + 16);
let mut found = false;
for line in yaml.lines() {
if !found && line.starts_with("enabled:") {
out.push_str(&format!("enabled: {enabled}\n"));
found = true;
} else {
out.push_str(line);
out.push('\n');
}
}
if !found {
out.push_str(&format!("enabled: {enabled}\n"));
}
out
}
async fn sync_yaml_mirror_enabled(s: &AppState, id: &str, enabled: bool) {
let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await else {
return;
};
let Ok(Some(bytes)) = kv.get(id).await else {
return;
};
let Ok(text) = String::from_utf8(bytes.to_vec()) else {
warn!(schedule_id = %id, "YAML mirror is not UTF-8; skipping enabled sync");
return;
};
let patched = patch_yaml_enabled(&text, enabled);
if let Err(e) = kv.put(id, patched.into_bytes().into()).await {
warn!(
error = %e,
schedule_id = %id,
enabled,
"YAML mirror enabled-flag sync failed; JSON catalog is current",
);
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisableQuery {
#[serde(default)]
pub cascade: bool,
#[serde(default)]
pub cascade_kill: bool,
}
pub async fn disable(
State(s): State<AppState>,
Path(id): Path<String>,
Query(q): Query<DisableQuery>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let schedules_kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on disable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = schedules_kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
schedule.enabled = false;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
schedules_kv
.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, false).await;
} else {
info!(schedule_id = %id, "schedule already disabled; revoke-only path");
}
let cascade_applied = if q.cascade {
let status_kv = s
.jetstream
.get_key_value(BUCKET_SCRIPT_STATUS)
.await
.map_err(|e| {
warn!(
error = %e,
bucket = BUCKET_SCRIPT_STATUS,
"schedule_disable cascade: status KV unavailable",
);
(
StatusCode::SERVICE_UNAVAILABLE,
format!("script_status bucket missing: {e}"),
)
})?;
status_kv
.put(&schedule.job_id, bytes::Bytes::from(SCRIPT_STATUS_REVOKED))
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("script_status put: {e}"),
)
})?;
info!(
schedule_id = %id,
job_id = %schedule.job_id,
"schedule disabled with cascade revoke",
);
true
} else {
info!(schedule_id = %id, "schedule soft-disabled");
false
};
let killed_execs = if q.cascade_kill {
match sqlx::query_scalar::<_, String>(
"SELECT DISTINCT exec_id FROM execution_results \
WHERE job_id = ? AND finished_at IS NULL AND exec_id IS NOT NULL",
)
.bind(&schedule.job_id)
.fetch_all(&s.pool)
.await
{
Ok(exec_ids) => {
futures::future::join_all(exec_ids.iter().map(|eid| {
let nats = s.nats.clone();
let eid = eid.clone();
async move {
if let Err(e) = nats
.publish(kanade_shared::subject::kill(&eid), bytes::Bytes::new())
.await
{
warn!(error = %e, exec_id = %eid, "schedule_disable cascade-kill: publish failed");
}
}
}))
.await;
if let Err(e) = s.nats.flush().await {
warn!(error = %e, "schedule_disable cascade-kill: flush failed");
}
info!(
schedule_id = %id,
job_id = %schedule.job_id,
count = exec_ids.len(),
"schedule disabled with cascade kill (in-flight execs signalled)",
);
exec_ids.len()
}
Err(e) => {
warn!(error = %e, job_id = %schedule.job_id, "schedule_disable cascade-kill: in-flight exec query failed; no kills sent");
0
}
}
} else {
0
};
audit::record(
&s.nats,
"operator",
"schedule_disable",
Some(&id),
Some(&caller),
serde_json::json!({
"cascade": cascade_applied,
"cascade_kill": q.cascade_kill,
"killed_execs": killed_execs,
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn enable(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on enable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
info!(schedule_id = %id, "schedule already enabled; no-op");
return Ok(StatusCode::NO_CONTENT);
}
schedule.enabled = true;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
kv.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, true).await;
info!(schedule_id = %id, "schedule enabled");
audit::record(
&s.nats,
"operator",
"schedule_enable",
Some(&id),
Some(&caller),
serde_json::json!({
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn delete(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "schedules KV missing on delete");
return Err((StatusCode::NOT_FOUND, "schedules bucket missing".into()));
}
};
kv.delete(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv delete: {e}")))?;
info!(schedule_id = %id, "schedule deleted");
audit::record(
&s.nats,
"operator",
"schedule_delete",
Some(&id),
Some(&caller),
serde_json::json!({}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use super::patch_yaml_enabled;
#[test]
fn flips_existing_flag_and_preserves_comments() {
let yaml = "# nightly inventory sweep\nid: inv-hw\ncron: \"0 3 * * *\"\nenabled: false\njob_id: inventory-hw\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("enabled: true\n"));
assert!(!out.contains("enabled: false"));
assert!(out.starts_with("# nightly inventory sweep\n"));
assert!(out.contains("cron: \"0 3 * * *\"\n"));
}
#[test]
fn drops_inline_comment_on_the_flipped_line_only() {
let yaml = "id: s1\nenabled: true # stopped during incident\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.contains("enabled: false\n"));
assert!(!out.contains("stopped during incident"));
assert!(out.contains("cron: \"* * * * *\"\n"));
}
#[test]
fn ignores_indented_enabled_in_nested_maps() {
let yaml = "id: s1\ntarget:\n enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("\nenabled: true\n"));
assert!(out.contains(" enabled: false\n"));
}
#[test]
fn appends_when_missing() {
let yaml = "id: s1\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.ends_with("enabled: false\n"));
assert!(out.starts_with("id: s1\n"));
}
#[test]
fn only_first_top_level_occurrence_is_patched() {
let yaml = "enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert_eq!(out.matches("enabled: true").count(), 1);
}
use super::schedule_run_stats;
use chrono::{Duration, Utc};
use sqlx::SqlitePool;
async fn fresh_pool() -> SqlitePool {
let pool = sqlx::sqlite::SqlitePoolOptions::new()
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
async fn insert_exec(
pool: &SqlitePool,
result_id: &str,
job_id: &str,
exit_code: Option<i64>,
finished: bool,
recorded_ago_min: i64,
) {
let now = Utc::now();
let recorded = now - Duration::minutes(recorded_ago_min);
let started = recorded - Duration::minutes(1);
let finished_at = finished.then_some(recorded);
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at, job_id)
VALUES (?, 'req', 'pc-1', ?, '', '', ?, ?, ?, ?)",
)
.bind(result_id)
.bind(exit_code)
.bind(started)
.bind(finished_at)
.bind(recorded)
.bind(job_id)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn run_stats_tallies_recent_and_picks_latest() {
let pool = fresh_pool().await;
insert_exec(&pool, "old", "j1", Some(0), true, 60 * 30).await; insert_exec(&pool, "fail", "j1", Some(1), true, 120).await;
insert_exec(&pool, "ok", "j1", Some(0), true, 60).await;
insert_exec(&pool, "running", "j1", None, false, 10).await; insert_exec(&pool, "other", "j2", Some(1), true, 60).await;
let since = Utc::now() - Duration::hours(24);
let (last, recent) = schedule_run_stats(&pool, "j1", since).await.unwrap();
let last = last.expect("j1 has runs");
assert_eq!(last.exit_code, None);
assert!(last.finished_at.is_none());
assert_eq!(recent.ok, 1);
assert_eq!(recent.fail, 1);
assert_eq!(recent.window_hours, 24);
}
#[allow(clippy::too_many_arguments)]
async fn insert_run(
pool: &SqlitePool,
result_id: &str,
job_id: &str,
pc_id: &str,
exit_code: Option<i64>,
finished: bool,
version: &str,
recorded_ago_min: i64,
) {
let now = Utc::now();
let recorded = now - Duration::minutes(recorded_ago_min);
let started = recorded - Duration::minutes(1);
let finished_at = finished.then_some(recorded);
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at, job_id, version)
VALUES (?, 'req', ?, ?, '', '', ?, ?, ?, ?, ?)",
)
.bind(result_id)
.bind(pc_id)
.bind(exit_code)
.bind(started)
.bind(finished_at)
.bind(recorded)
.bind(job_id)
.bind(version)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn coverage_rows_picks_latest_finished_per_pc() {
let pool = fresh_pool().await;
insert_run(&pool, "a1", "j1", "pc-a", Some(0), true, "v1", 120).await;
insert_run(&pool, "a2", "j1", "pc-a", Some(1), true, "v2", 30).await;
insert_run(&pool, "b1", "j1", "pc-b", Some(0), true, "v3", 60).await;
insert_run(&pool, "b2", "j1", "pc-b", None, false, "v3", 5).await;
insert_run(&pool, "x1", "j2", "pc-a", Some(0), true, "v9", 10).await;
let (inflight, finished) = coverage_rows(&pool, "j1").await.unwrap();
assert!(inflight.contains("pc-b"));
assert!(!inflight.contains("pc-a"));
assert_eq!(finished.get("pc-a").unwrap().0, Some(1));
assert_eq!(finished.get("pc-a").unwrap().1.as_deref(), Some("v2"));
assert_eq!(finished.get("pc-b").unwrap().0, Some(0));
assert_eq!(finished.get("pc-b").unwrap().1.as_deref(), Some("v3"));
assert_eq!(finished.len(), 2);
}
#[tokio::test]
async fn run_stats_empty_for_unknown_job() {
let pool = fresh_pool().await;
insert_exec(&pool, "x", "j1", Some(0), true, 10).await;
let since = Utc::now() - Duration::hours(24);
let (last, recent) = schedule_run_stats(&pool, "no-such-job", since)
.await
.unwrap();
assert!(last.is_none());
assert_eq!((recent.ok, recent.fail), (0, 0));
}
use super::{FinishedMap, coverage_for, coverage_rows};
use std::collections::{HashMap, HashSet};
fn pcs(v: &[&str]) -> Vec<String> {
v.iter().map(|s| s.to_string()).collect()
}
#[test]
fn coverage_classifies_each_roster_pc() {
let roster = pcs(&["pc-ok", "pc-fail", "pc-run", "pc-pend", "pc-off"]);
let inflight: HashSet<String> = ["pc-run"].iter().map(|s| s.to_string()).collect();
let mut finished: FinishedMap = HashMap::new();
finished.insert(
"pc-ok".into(),
(
Some(0),
Some("v1.4.3".into()),
Some("2026-06-12T00:00:00Z".into()),
),
);
finished.insert(
"pc-fail".into(),
(
Some(1),
Some("v1.4.2".into()),
Some("2026-06-12T00:00:00Z".into()),
),
);
finished.insert("pc-run".into(), (Some(0), None, None));
let (agents, ok, fail, running, pending) = coverage_for(&roster, &inflight, &finished);
assert_eq!((ok, fail, running, pending), (1, 1, 1, 2));
assert_eq!(agents.len(), 5);
let state = |pc: &str| agents.iter().find(|a| a.pc_id == pc).unwrap().state;
assert_eq!(state("pc-ok"), "ok");
assert_eq!(state("pc-fail"), "fail");
assert_eq!(state("pc-run"), "running");
assert_eq!(state("pc-pend"), "pending");
assert_eq!(state("pc-off"), "pending");
let ver = |pc: &str| {
agents
.iter()
.find(|a| a.pc_id == pc)
.unwrap()
.version
.clone()
};
assert_eq!(ver("pc-ok").as_deref(), Some("v1.4.3"));
assert_eq!(ver("pc-run"), None);
assert_eq!(ver("pc-pend"), None);
}
#[test]
fn coverage_all_pending_when_no_runs() {
let roster = pcs(&["a", "b", "c"]);
let (agents, ok, fail, running, pending) =
coverage_for(&roster, &HashSet::new(), &HashMap::new());
assert_eq!((ok, fail, running, pending), (0, 0, 0, 3));
assert!(agents.iter().all(|a| a.state == "pending"));
}
#[test]
fn coverage_finished_with_null_exit_is_fail() {
let roster = pcs(&["x"]);
let mut finished: FinishedMap = HashMap::new();
finished.insert(
"x".into(),
(None, None, Some("2026-06-12T00:00:00Z".into())),
);
let (_, ok, fail, _, _) = coverage_for(&roster, &HashSet::new(), &finished);
assert_eq!((ok, fail), (0, 1));
}
#[test]
fn coverage_ignores_runs_for_pcs_outside_roster() {
let roster = pcs(&["in"]);
let mut finished: FinishedMap = HashMap::new();
finished.insert("in".into(), (Some(0), None, None));
finished.insert("gone".into(), (Some(0), None, None));
let (agents, ok, _, _, _) = coverage_for(&roster, &HashSet::new(), &finished);
assert_eq!(ok, 1);
assert_eq!(agents.len(), 1);
assert_eq!(agents[0].pc_id, "in");
}
}