use async_nats::jetstream::kv::Config as KvConfig;
use axum::Json;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::http::header::HeaderMap;
use futures::TryStreamExt;
use kanade_shared::kv::{
BUCKET_SCHEDULES, BUCKET_SCHEDULES_YAML, BUCKET_SCRIPT_STATUS, SCRIPT_STATUS_REVOKED,
};
use kanade_shared::manifest::Schedule;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use crate::api::AppState;
use crate::api::yaml_body::{YamlOrJson, mirror_yaml, yaml_headers};
use crate::audit;
use crate::audit::Caller;
#[derive(Serialize)]
pub struct ScheduleSummary {
pub id: String,
pub when: String,
pub enabled: bool,
pub job_id: String,
}
pub async fn list(State(s): State<AppState>) -> Result<Json<Vec<Schedule>>, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(_) => return Ok(Json(Vec::new())),
};
let keys_stream = kv
.keys()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let keys: Vec<String> = keys_stream
.try_collect()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv keys: {e}")))?;
let mut out = Vec::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = kv.get(&k).await
&& let Ok(sched) = serde_json::from_slice::<Schedule>(&bytes)
{
out.push(sched);
}
}
Ok(Json(out))
}
pub async fn create(
State(s): State<AppState>,
caller: Caller,
body: YamlOrJson<Schedule>,
) -> Result<Json<ScheduleSummary>, (StatusCode, String)> {
let YamlOrJson {
value: schedule,
raw_yaml,
} = body;
if let Err(e) = schedule.validate() {
return Err((StatusCode::BAD_REQUEST, format!("invalid schedule: {e}")));
}
match crate::api::jobs::fetch(&s.jetstream, &schedule.job_id).await {
Ok(Some(_)) => {}
Ok(None) => {
return Err((
StatusCode::BAD_REQUEST,
format!(
"unknown job_id '{}' — register the job first (kanade job create)",
schedule.job_id
),
));
}
Err(e) => {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("job catalog lookup: {e}"),
));
}
}
let kv = s
.jetstream
.create_key_value(KvConfig {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ensure KV: {e}")))?;
let body_bytes = serde_json::to_vec(&schedule)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("serialize: {e}")))?;
kv.put(&schedule.id, body_bytes.into())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV put: {e}")))?;
let yaml_source = raw_yaml.unwrap_or_else(|| {
serde_yaml::to_string(&schedule)
.unwrap_or_else(|_| String::from("# YAML mirror unavailable for this entry"))
});
if let Err(e) = mirror_yaml(&s, BUCKET_SCHEDULES_YAML, &schedule.id, &yaml_source).await {
warn!(
error = %e,
schedule_id = %schedule.id,
"schedules: YAML mirror put failed; JSON catalog is current",
);
}
info!(
schedule_id = %schedule.id,
when = %schedule.when,
job_id = %schedule.job_id,
"schedule upserted",
);
audit::record(
&s.nats,
"operator",
"schedule_upsert",
Some(&schedule.id),
Some(&caller),
serde_json::json!({
"when": schedule.when.to_string(),
"job_id": schedule.job_id,
"enabled": schedule.enabled,
}),
)
.await;
Ok(Json(ScheduleSummary {
id: schedule.id.clone(),
when: schedule.when.to_string(),
enabled: schedule.enabled,
job_id: schedule.job_id.clone(),
}))
}
pub async fn get_yaml(
State(s): State<AppState>,
Path(id): Path<String>,
) -> Result<(StatusCode, HeaderMap, String), (StatusCode, String)> {
if let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await
&& let Ok(Some(bytes)) = kv.get(&id).await
&& let Ok(text) = String::from_utf8(bytes.to_vec())
{
return Ok((StatusCode::OK, yaml_headers(), text));
}
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|_| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let bytes = kv
.get(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV get: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let schedule: Schedule = serde_json::from_slice(&bytes)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("decode: {e}")))?;
let yaml = serde_yaml::to_string(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode YAML: {e}"),
)
})?;
Ok((StatusCode::OK, yaml_headers(), yaml))
}
fn patch_yaml_enabled(yaml: &str, enabled: bool) -> String {
let mut out = String::with_capacity(yaml.len() + 16);
let mut found = false;
for line in yaml.lines() {
if !found && line.starts_with("enabled:") {
out.push_str(&format!("enabled: {enabled}\n"));
found = true;
} else {
out.push_str(line);
out.push('\n');
}
}
if !found {
out.push_str(&format!("enabled: {enabled}\n"));
}
out
}
async fn sync_yaml_mirror_enabled(s: &AppState, id: &str, enabled: bool) {
let Ok(kv) = s.jetstream.get_key_value(BUCKET_SCHEDULES_YAML).await else {
return;
};
let Ok(Some(bytes)) = kv.get(id).await else {
return;
};
let Ok(text) = String::from_utf8(bytes.to_vec()) else {
warn!(schedule_id = %id, "YAML mirror is not UTF-8; skipping enabled sync");
return;
};
let patched = patch_yaml_enabled(&text, enabled);
if let Err(e) = kv.put(id, patched.into_bytes().into()).await {
warn!(
error = %e,
schedule_id = %id,
enabled,
"YAML mirror enabled-flag sync failed; JSON catalog is current",
);
}
}
#[derive(Deserialize, Debug, Default)]
pub struct DisableQuery {
#[serde(default)]
pub cascade: bool,
}
pub async fn disable(
State(s): State<AppState>,
Path(id): Path<String>,
Query(q): Query<DisableQuery>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let schedules_kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on disable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = schedules_kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
schedule.enabled = false;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
schedules_kv
.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, false).await;
} else {
info!(schedule_id = %id, "schedule already disabled; revoke-only path");
}
let cascade_applied = if q.cascade {
let status_kv = s
.jetstream
.get_key_value(BUCKET_SCRIPT_STATUS)
.await
.map_err(|e| {
warn!(
error = %e,
bucket = BUCKET_SCRIPT_STATUS,
"schedule_disable cascade: status KV unavailable",
);
(
StatusCode::SERVICE_UNAVAILABLE,
format!("script_status bucket missing: {e}"),
)
})?;
status_kv
.put(&schedule.job_id, bytes::Bytes::from(SCRIPT_STATUS_REVOKED))
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("script_status put: {e}"),
)
})?;
info!(
schedule_id = %id,
job_id = %schedule.job_id,
"schedule disabled with cascade revoke",
);
true
} else {
info!(schedule_id = %id, "schedule soft-disabled");
false
};
audit::record(
&s.nats,
"operator",
"schedule_disable",
Some(&id),
Some(&caller),
serde_json::json!({
"cascade": cascade_applied,
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn enable(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = s
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.map_err(|e| {
warn!(error = %e, "schedules KV missing on enable");
(
StatusCode::SERVICE_UNAVAILABLE,
format!("schedules bucket missing: {e}"),
)
})?;
let entry = kv
.entry(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV entry: {e}")))?
.ok_or_else(|| (StatusCode::NOT_FOUND, format!("schedule '{id}' not found")))?;
let mut schedule: Schedule = serde_json::from_slice(&entry.value).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("deserialize stored schedule: {e}"),
)
})?;
if schedule.enabled {
info!(schedule_id = %id, "schedule already enabled; no-op");
return Ok(StatusCode::NO_CONTENT);
}
schedule.enabled = true;
let body = serde_json::to_vec(&schedule).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("serialize schedule: {e}"),
)
})?;
kv.update(&id, body.into(), entry.revision)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("KV update: {e}")))?;
sync_yaml_mirror_enabled(&s, &id, true).await;
info!(schedule_id = %id, "schedule enabled");
audit::record(
&s.nats,
"operator",
"schedule_enable",
Some(&id),
Some(&caller),
serde_json::json!({
"job_id": schedule.job_id,
}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
pub async fn delete(
State(s): State<AppState>,
Path(id): Path<String>,
caller: Caller,
) -> Result<StatusCode, (StatusCode, String)> {
let kv = match s.jetstream.get_key_value(BUCKET_SCHEDULES).await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "schedules KV missing on delete");
return Err((StatusCode::NOT_FOUND, "schedules bucket missing".into()));
}
};
kv.delete(&id)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("kv delete: {e}")))?;
info!(schedule_id = %id, "schedule deleted");
audit::record(
&s.nats,
"operator",
"schedule_delete",
Some(&id),
Some(&caller),
serde_json::json!({}),
)
.await;
Ok(StatusCode::NO_CONTENT)
}
#[cfg(test)]
mod tests {
use super::patch_yaml_enabled;
#[test]
fn flips_existing_flag_and_preserves_comments() {
let yaml = "# nightly inventory sweep\nid: inv-hw\ncron: \"0 3 * * *\"\nenabled: false\njob_id: inventory-hw\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("enabled: true\n"));
assert!(!out.contains("enabled: false"));
assert!(out.starts_with("# nightly inventory sweep\n"));
assert!(out.contains("cron: \"0 3 * * *\"\n"));
}
#[test]
fn drops_inline_comment_on_the_flipped_line_only() {
let yaml = "id: s1\nenabled: true # stopped during incident\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.contains("enabled: false\n"));
assert!(!out.contains("stopped during incident"));
assert!(out.contains("cron: \"* * * * *\"\n"));
}
#[test]
fn ignores_indented_enabled_in_nested_maps() {
let yaml = "id: s1\ntarget:\n enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert!(out.contains("\nenabled: true\n"));
assert!(out.contains(" enabled: false\n"));
}
#[test]
fn appends_when_missing() {
let yaml = "id: s1\ncron: \"* * * * *\"\n";
let out = patch_yaml_enabled(yaml, false);
assert!(out.ends_with("enabled: false\n"));
assert!(out.starts_with("id: s1\n"));
}
#[test]
fn only_first_top_level_occurrence_is_patched() {
let yaml = "enabled: false\nenabled: false\n";
let out = patch_yaml_enabled(yaml, true);
assert_eq!(out.matches("enabled: true").count(), 1);
}
}