use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::http::header::HeaderMap;
use futures::TryStreamExt;
use kanade_shared::kv::{
BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED,
};
use kanade_shared::manifest::Schedule;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use crate::api::AppState;
use crate::api::yaml_body::{YamlOrJson, mirror_yaml, yaml_headers};
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize)]
pub struct ScheduleSummary {
pub id: String,
pub when: String,
pub enabled: bool,
pub job_id: String,
}
pub async fn list(State(s): State<AppState>) -> Result<Json<Vec<Schedule>>, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys_stream = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let keys: Vec<String> = keys_stream
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let mut out = Vec::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = kv.get(&k).await
&& let Ok(sched) = serde_json::from_slice::<Schedule>(&bytes)
{
out.push(sched);
}
}
Ok(Json(out))
}
#[derive(Deserialize, Debug)]
pub struct PreviewQuery {
#[serde(default = "default_preview_count")]
pub count: usize,
}
fn default_preview_count() -> usize {
5
}
#[derive(Serialize)]
pub struct PreviewResponse {
pub id: String,
pub when: String,
pub tz: String,
pub enabled: bool,
pub fires: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub note: Option<String>,
}
pub async fn preview(
State(s): State<AppState>,
Path(id): Path<String>,
Query(q): Query<PreviewQuery>,
) -> Result<Json<PreviewResponse>, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
let count = q.count.clamp(1, 50);
let fires = schedule.preview_fires(chrono::Utc::now(), count);
let note = if !fires.is_empty() {
None
} else if matches!(schedule.when, kanade_shared::manifest::When::Calendar(_)) {
Some(
"no upcoming fires — a past one-shot, or the fire time is excluded by the \
active window / constraints.window"
.to_string(),
)
} else {
Some(format!(
"reconcile cadence ({}) polls every minute gated by cooldown — no discrete \
fire times to preview",
schedule.when
))
};
Ok(Json(PreviewResponse {
id: schedule.id.clone(),
when: schedule.when.to_string(),
tz: schedule.tz.as_str().to_string(),
enabled: schedule.enabled,
fires: fires.iter().map(|t| t.to_rfc3339()).collect(),
note,
}))
}
const STATUS_WINDOW_HOURS: i64 = 24;
#[derive(Serialize, Default)]
pub struct LastRun {
pub pc_id: String,
pub exit_code: Option<i64>,
pub started_at: Option<String>,
pub finished_at: Option<String>,
}
#[derive(Serialize, Default)]
pub struct RecentCounts {
pub window_hours: i64,
pub ok: i64,
pub fail: i64,
}
#[derive(Serialize)]
pub struct StatusResponse {
pub id: String,
pub when: String,
pub tz: String,
pub enabled: bool,
pub next_run: Option<String>,
pub last_run: Option<LastRun>,
pub recent: RecentCounts,
}
async fn schedule_run_stats(
pool: &sqlx::SqlitePool,
job_id: &str,
since: chrono::DateTime<chrono::Utc>,
) -> Result<(Option<LastRun>, RecentCounts), sqlx::Error> {
use sqlx::Row;
let last = sqlx::query(
"SELECT pc_id, exit_code, started_at, finished_at
FROM execution_results
WHERE job_id = ?
ORDER BY recorded_at DESC
LIMIT 1",
)
.bind(job_id)
.fetch_optional(pool)
.await?
.map(|r| LastRun {
pc_id: r.try_get("pc_id").unwrap_or_default(),
exit_code: r.try_get::<Option<i64>, _>("exit_code").unwrap_or(None),
started_at: r.try_get("started_at").ok(),
finished_at: r
.try_get::<Option<String>, _>("finished_at")
.unwrap_or(None),
});
let counts = sqlx::query(
"SELECT
COALESCE(SUM(CASE WHEN exit_code = 0 THEN 1 ELSE 0 END), 0) AS ok,
COALESCE(SUM(CASE WHEN exit_code IS NOT NULL AND exit_code <> 0 THEN 1 ELSE 0 END), 0) AS fail
FROM execution_results
WHERE job_id = ? AND finished_at IS NOT NULL AND recorded_at >= ?",
)
.bind(job_id)
.bind(since)
.fetch_one(pool)
.await?;
let recent = RecentCounts {
window_hours: STATUS_WINDOW_HOURS,
ok: counts.try_get("ok").unwrap_or(0),
fail: counts.try_get("fail").unwrap_or(0),
};
Ok((last, recent))
}
pub async fn status(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<Json<StatusResponse>, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
let now = chrono::Utc::now();
let next_run = schedule
.preview_fires(now, 1)
.first()
.map(chrono::DateTime::to_rfc3339);
let since = now - chrono::Duration::hours(STATUS_WINDOW_HOURS);
let (last_run, recent) = schedule_run_stats(&s.pool, &schedule.job_id, since)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("run stats: {e}")))?;
Ok(Json(StatusResponse {
id: schedule.id.clone(),
when: schedule.when.to_string(),
tz: schedule.tz.as_str().to_string(),
enabled: schedule.enabled,
next_run,
last_run,
recent,
}))
}
pub async fn create(
State(s): State<AppState>,
caller: Caller,
body: YamlOrJson<Schedule>,
) -> Result<Json<ScheduleSummary>, (StatusCode, String)> {
let YamlOrJson {
value: schedule,
raw_yaml,
} = body;
if let Err(e) = schedule.validate() {
return Err((StatusCode::BAD_REQUEST, format!("invalid schedule: {e}")));
}
match crate::api::jobs::fetch(&s.jetstream, &schedule.job_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return Err((
StatusCode::BAD_REQUEST,
format!(
"unknown job_id '{}' — register the job first (kanade job create)",
schedule.job_id
),
));
}
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("job catalog lookup: {e}"),
));
}
}
let kv = s
.jetstream
.create_key_value(KvConfig {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
let body_bytes = serde_json::to_vec(&schedule)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
kv.put(&schedule.id, body_bytes.into())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
let yaml_source = raw_yaml.unwrap_or_else(|| {
serde_yaml::to_string(&schedule)
.unwrap_or_else(|_| String::from("# YAML mirror unavailable for this entry"))
});
if let Err(e) = mirror_yaml(&s, BUCKET_SCHEDULES_YAML, &schedule.id, &yaml_source).await {
warn!(
error = %e,
schedule_id = %schedule.id,
"schedules: YAML mirror put failed; JSON catalog is current",
);
}
info!(
schedule_id = %schedule.id,
when = %schedule.when,
job_id = %schedule.job_id,
"schedule upserted",
);
audit::record(
&s.nats,
"operator",
"schedule_upsert",
Some(&schedule.id),
Some(&caller),
serde_json::json!({
"when": schedule.when.to_string(),
"job_id": schedule.job_id,
"enabled": schedule.enabled,
}),
)
.await;
Ok(Json(ScheduleSummary {
id: schedule.id.clone(),
when: schedule.when.to_string(),
enabled: schedule.enabled,
job_id: schedule.job_id.clone(),
}))
}
pub async fn get_yaml(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, HeaderMap, String), (StatusCode, String)> {
if let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await
&& let Ok(Some(bytes)) = kv.get(&id).await
&& let Ok(text) = String::from_utf8(bytes.to_vec())
{
return Ok((StatusCode::OK, yaml_headers(), text));
}
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|_| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("decode: {e}")))?;
let yaml = serde_yaml::to_string(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode YAML: {e}"),
)
})?;
Ok((StatusCode::OK, yaml_headers(), yaml))
}
fn patch_yaml_enabled(yaml: &str, enabled: bool) -> String {
let mut out = String::with_capacity(yaml.len() + 16);
let mut found = false;
for line in yaml.lines() {
if !found && line.starts_with("enabled:") {
out.push_str(&format!("enabled: {enabled}\n"));
found = true;
} else {
out.push_str(line);
out.push('\n');
}
}
if !found {
out.push_str(&format!("enabled: {enabled}\n"));
}
out
}
async fn sync_yaml_mirror_enabled(s: &AppState, id: &str, enabled: bool) {
let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await else {
return;
};
let Ok(Some(bytes)) = kv.get(id).await else {
return;
};
let Ok(text) = String::from_utf8(bytes.to_vec()) else {
warn!(schedule_id = %id, "YAML mirror is not UTF-8; skipping enabled sync");
return;
};
let patched = patch_yaml_enabled(&text, enabled);
if let Err(e) = kv.put(id, patched.into_bytes().into()).await {
warn!(
error = %e,
schedule_id = %id,
enabled,
"YAML mirror enabled-flag sync failed; JSON catalog is current",
);
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisableQuery {
#[serde(default)]
pub cascade: bool,
#[serde(default)]
pub cascade_kill: bool,
}
pub async fn disable(
State(s): State<AppState>,
Path(id): Path<String>,
Query(q): Query<DisableQuery>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let schedules_kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on disable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = schedules_kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
schedule.enabled = false;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
schedules_kv
.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, false).await;
} else {
info!(schedule_id = %id, "schedule already disabled; revoke-only path");
}
let cascade_applied = if q.cascade {
let status_kv = s
.jetstream
.get_key_value(BUCKET_SCRIPT_STATUS)
.await
.map_err(|e| {
warn!(
error = %e,
bucket = BUCKET_SCRIPT_STATUS,
"schedule_disable cascade: status KV unavailable",
);
(
StatusCode::SERVICE_UNAVAILABLE,
format!("script_status bucket missing: {e}"),
)
})?;
status_kv
.put(&schedule.job_id, bytes::Bytes::from(SCRIPT_STATUS_REVOKED))
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("script_status put: {e}"),
)
})?;
info!(
schedule_id = %id,
job_id = %schedule.job_id,
"schedule disabled with cascade revoke",
);
true
} else {
info!(schedule_id = %id, "schedule soft-disabled");
false
};
let killed_execs = if q.cascade_kill {
match sqlx::query_scalar::<_, String>(
"SELECT DISTINCT exec_id FROM execution_results \
WHERE job_id = ? AND finished_at IS NULL AND exec_id IS NOT NULL",
)
.bind(&schedule.job_id)
.fetch_all(&s.pool)
.await
{
Ok(exec_ids) => {
futures::future::join_all(exec_ids.iter().map(|eid| {
let nats = s.nats.clone();
let eid = eid.clone();
async move {
if let Err(e) = nats
.publish(kanade_shared::subject::kill(&eid), bytes::Bytes::new())
.await
{
warn!(error = %e, exec_id = %eid, "schedule_disable cascade-kill: publish failed");
}
}
}))
.await;
if let Err(e) = s.nats.flush().await {
warn!(error = %e, "schedule_disable cascade-kill: flush failed");
}
info!(
schedule_id = %id,
job_id = %schedule.job_id,
count = exec_ids.len(),
"schedule disabled with cascade kill (in-flight execs signalled)",
);
exec_ids.len()
}
Err(e) => {
warn!(error = %e, job_id = %schedule.job_id, "schedule_disable cascade-kill: in-flight exec query failed; no kills sent");
0
}
}
} else {
0
};
audit::record(
&s.nats,
"operator",
"schedule_disable",
Some(&id),
Some(&caller),
serde_json::json!({
"cascade": cascade_applied,
"cascade_kill": q.cascade_kill,
"killed_execs": killed_execs,
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn enable(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on enable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
info!(schedule_id = %id, "schedule already enabled; no-op");
return Ok(StatusCode::NO_CONTENT);
}
schedule.enabled = true;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
kv.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, true).await;
info!(schedule_id = %id, "schedule enabled");
audit::record(
&s.nats,
"operator",
"schedule_enable",
Some(&id),
Some(&caller),
serde_json::json!({
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn delete(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "schedules KV missing on delete");
return Err((StatusCode::NOT_FOUND, "schedules bucket missing".into()));
}
};
kv.delete(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv delete: {e}")))?;
info!(schedule_id = %id, "schedule deleted");
audit::record(
&s.nats,
"operator",
"schedule_delete",
Some(&id),
Some(&caller),
serde_json::json!({}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use super::patch_yaml_enabled;
#[test]
fn flips_existing_flag_and_preserves_comments() {
let yaml = "# nightly inventory sweep\nid: inv-hw\ncron: \"0 3 * * *\"\nenabled: false\njob_id: inventory-hw\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("enabled: true\n"));
assert!(!out.contains("enabled: false"));
assert!(out.starts_with("# nightly inventory sweep\n"));
assert!(out.contains("cron: \"0 3 * * *\"\n"));
}
#[test]
fn drops_inline_comment_on_the_flipped_line_only() {
let yaml = "id: s1\nenabled: true # stopped during incident\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.contains("enabled: false\n"));
assert!(!out.contains("stopped during incident"));
assert!(out.contains("cron: \"* * * * *\"\n"));
}
#[test]
fn ignores_indented_enabled_in_nested_maps() {
let yaml = "id: s1\ntarget:\n enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("\nenabled: true\n"));
assert!(out.contains(" enabled: false\n"));
}
#[test]
fn appends_when_missing() {
let yaml = "id: s1\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.ends_with("enabled: false\n"));
assert!(out.starts_with("id: s1\n"));
}
#[test]
fn only_first_top_level_occurrence_is_patched() {
let yaml = "enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert_eq!(out.matches("enabled: true").count(), 1);
}
use super::schedule_run_stats;
use chrono::{Duration, Utc};
use sqlx::SqlitePool;
async fn fresh_pool() -> SqlitePool {
let pool = sqlx::sqlite::SqlitePoolOptions::new()
.connect("sqlite::memory:")
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
async fn insert_exec(
pool: &SqlitePool,
result_id: &str,
job_id: &str,
exit_code: Option<i64>,
finished: bool,
recorded_ago_min: i64,
) {
let now = Utc::now();
let recorded = now - Duration::minutes(recorded_ago_min);
let started = recorded - Duration::minutes(1);
let finished_at = finished.then_some(recorded);
sqlx::query(
"INSERT INTO execution_results
(result_id, request_id, pc_id, exit_code, stdout, stderr,
started_at, finished_at, recorded_at, job_id)
VALUES (?, 'req', 'pc-1', ?, '', '', ?, ?, ?, ?)",
)
.bind(result_id)
.bind(exit_code)
.bind(started)
.bind(finished_at)
.bind(recorded)
.bind(job_id)
.execute(pool)
.await
.unwrap();
}
#[tokio::test]
async fn run_stats_tallies_recent_and_picks_latest() {
let pool = fresh_pool().await;
insert_exec(&pool, "old", "j1", Some(0), true, 60 * 30).await; insert_exec(&pool, "fail", "j1", Some(1), true, 120).await;
insert_exec(&pool, "ok", "j1", Some(0), true, 60).await;
insert_exec(&pool, "running", "j1", None, false, 10).await; insert_exec(&pool, "other", "j2", Some(1), true, 60).await;
let since = Utc::now() - Duration::hours(24);
let (last, recent) = schedule_run_stats(&pool, "j1", since).await.unwrap();
let last = last.expect("j1 has runs");
assert_eq!(last.exit_code, None);
assert!(last.finished_at.is_none());
assert_eq!(recent.ok, 1);
assert_eq!(recent.fail, 1);
assert_eq!(recent.window_hours, 24);
}
#[tokio::test]
async fn run_stats_empty_for_unknown_job() {
let pool = fresh_pool().await;
insert_exec(&pool, "x", "j1", Some(0), true, 10).await;
let since = Utc::now() - Duration::hours(24);
let (last, recent) = schedule_run_stats(&pool, "no-such-job", since)
.await
.unwrap();
assert!(last.is_none());
assert_eq!((recent.ok, recent.fail), (0, 0));
}
}