pub mod policy;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use anyhow::{Context, Result};
use async_nats::jetstream::kv::Operation;
use chrono::{Duration as ChronoDuration, Utc};
use futures::{StreamExt, TryStreamExt};
use kanade_shared::kv::{BUCKET_AGENT_GROUPS, BUCKET_SCHEDULES};
use kanade_shared::manifest::{ExecMode, FanoutPlan, Schedule, Target};
use sqlx::Row;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::exec::exec_manifest;
use crate::audit;
use policy::{Completion, Decision, FireAction, decide_fire};
const ALIVE_THRESHOLD: ChronoDuration = ChronoDuration::minutes(2);
type Registered = Arc<Mutex<HashMap<String, Uuid>>>;
pub async fn run(state: AppState) -> Result<()> {
let kv = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.context("ensure schedules KV")?;
let sched = JobScheduler::new().await.context("init JobScheduler")?;
sched.start().await.context("start JobScheduler")?;
let registered: Registered = Arc::new(Mutex::new(HashMap::new()));
let keys: Vec<String> = match kv.keys().await {
Ok(stream) => stream.try_collect().await.unwrap_or_else(|e| {
warn!(error = %e, "collect schedules KV keys (initial load best-effort)");
Vec::new()
}),
Err(e) => {
warn!(error = %e, "list schedules KV keys (likely empty bucket; watch loop still arms)");
Vec::new()
}
};
for k in keys {
let entry = match kv.get(&k).await {
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(error = %e, key = %k, "kv get");
continue;
}
};
match serde_json::from_slice::<Schedule>(&entry) {
Ok(s) if s.enabled => {
if let Err(e) = register(&sched, state.clone(), ®istered, s.clone()).await {
warn!(error = %e, schedule_id = %s.id, "initial register failed");
}
}
Ok(s) => info!(schedule_id = %s.id, "skipped (disabled)"),
Err(e) => warn!(error = %e, key = %k, "deserialize Schedule"),
}
}
let initial_count = registered.lock().await.len();
info!(
count = initial_count,
"scheduler registered initial schedules"
);
let mut watcher = kv.watch_all().await.context("kv watch_all")?;
while let Some(entry) = watcher.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "watch entry error");
continue;
}
};
match entry.operation {
Operation::Put => {
let sched_data: Schedule = match serde_json::from_slice(&entry.value) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, key = %entry.key, "deserialize Schedule on watch");
continue;
}
};
unregister(&sched, ®istered, &sched_data.id).await;
if sched_data.enabled
&& let Err(e) =
register(&sched, state.clone(), ®istered, sched_data.clone()).await
{
warn!(error = %e, schedule_id = %sched_data.id, "watch register failed");
}
}
Operation::Delete | Operation::Purge => {
unregister(&sched, ®istered, &entry.key).await;
}
}
}
std::future::pending::<Result<()>>().await
}
async fn register(
sched: &JobScheduler,
state: AppState,
registered: &Registered,
schedule: Schedule,
) -> Result<()> {
let cron = schedule.cron.clone();
let schedule_snapshot = schedule.clone();
let job = Job::new_async(cron.as_str(), move |_uuid, _l| {
let state = state.clone();
let schedule = schedule_snapshot.clone();
Box::pin(async move {
tick(&state, schedule).await;
})
})
.with_context(|| format!("Job::new_async (cron={cron})"))?;
let uuid = sched.add(job).await.context("scheduler.add")?;
registered.lock().await.insert(schedule.id.clone(), uuid);
info!(
schedule_id = %schedule.id,
cron = %schedule.cron,
mode = ?schedule.mode,
"scheduled",
);
Ok(())
}
async fn tick(state: &AppState, schedule: Schedule) {
let schedule_id = schedule.id.clone();
let job_id = schedule.job_id.clone();
let manifest = match crate::api::jobs::fetch(&state.jetstream, &job_id).await {
Ok(Some(m)) => m,
Ok(None) => {
warn!(
%schedule_id, %job_id,
"scheduler fire skipped: job not registered in catalog",
);
return;
}
Err(e) => {
warn!(%schedule_id, %job_id, error = %e, "scheduler fire failed: catalog lookup error");
return;
}
};
let now = Utc::now();
let deadline_at = match parse_starting_deadline(schedule.starting_deadline.as_deref(), now) {
Ok(v) => v,
Err(e) => {
warn!(
%schedule_id, error = %e,
"scheduler fire failed: invalid starting_deadline",
);
return;
}
};
let plan_for_dispatch = || {
let mut p = schedule.plan.clone();
p.deadline_at = deadline_at;
p
};
if matches!(schedule.mode, ExecMode::EveryTick) {
dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
"EveryTick",
)
.await;
return;
}
let expected = match resolve_expected_pcs(state, &schedule.plan.target).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: target resolve");
return;
}
};
let completions = match recent_completions(state, &job_id).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: completion lookup");
return;
}
};
let cooldown = match parse_cooldown(schedule.cooldown.as_deref()) {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = %e, "scheduler fire failed: invalid cooldown");
return;
}
};
let decision: Decision = decide_fire(
schedule.mode,
cooldown,
schedule.auto_disable_when_done,
&expected,
&completions,
Utc::now(),
);
match decision.action {
FireAction::Skip => {
tracing::debug!(
%schedule_id, mode = ?schedule.mode,
expected = expected.len(),
completions = completions.len(),
"scheduler tick: dedup says skip",
);
}
FireAction::FireWholeTarget => {
dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
"OncePerTarget armed",
)
.await;
}
FireAction::FirePcs(pc_ids) => {
let mut plan = plan_for_dispatch();
plan.target = Target {
pcs: pc_ids.clone(),
..Target::default()
};
plan.rollout = None;
info!(
%schedule_id, pcs = pc_ids.len(),
"OncePerPc: firing at remaining pcs",
);
dispatch(state, &schedule_id, manifest, plan, "OncePerPc subset").await;
}
}
if decision.auto_disable {
if let Err(e) = disable_schedule(state, &schedule).await {
warn!(%schedule_id, error = ?e, "auto-disable persist failed");
}
}
}
async fn dispatch(
state: &AppState,
schedule_id: &str,
manifest: kanade_shared::manifest::Manifest,
plan: FanoutPlan,
why: &str,
) {
match exec_manifest(state, manifest, plan, "scheduler").await {
Ok(resp) => info!(
%schedule_id, exec_id = %resp.exec_id, why,
"scheduler exec ok",
),
Err((status, msg)) => warn!(
%schedule_id, status = %status, error = %msg, why,
"scheduler exec failed",
),
}
}
fn parse_cooldown(s: Option<&str>) -> Result<Option<ChronoDuration>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse cooldown '{raw}'"))?;
Ok(Some(
ChronoDuration::from_std(std).context("cooldown overflow")?,
))
}
}
}
fn parse_starting_deadline(
s: Option<&str>,
now: chrono::DateTime<Utc>,
) -> Result<Option<chrono::DateTime<Utc>>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse starting_deadline '{raw}'"))?;
let d = ChronoDuration::from_std(std).context("starting_deadline overflow")?;
Ok(Some(now + d))
}
}
}
async fn recent_completions(state: &AppState, job_id: &str) -> Result<Vec<Completion>> {
let rows = sqlx::query(
"SELECT pc_id, MAX(finished_at) AS finished_at
FROM execution_results
WHERE job_id = ? AND exit_code = 0
GROUP BY pc_id",
)
.bind(job_id)
.fetch_all(&state.pool)
.await
.context("execution_results dedup query")?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let pc_id: String = r.try_get("pc_id").unwrap_or_default();
let finished_at: chrono::DateTime<Utc> = match r.try_get("finished_at") {
Ok(t) => t,
Err(_) => continue,
};
if !pc_id.is_empty() {
out.push(Completion { pc_id, finished_at });
}
}
Ok(out)
}
async fn resolve_expected_pcs(state: &AppState, target: &Target) -> Result<Vec<String>> {
let mut out: HashSet<String> = HashSet::new();
if target.all {
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let rows = sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ? ORDER BY pc_id")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("agents alive query")?;
for r in rows {
if let Ok(pc) = r.try_get::<String, _>("pc_id") {
out.insert(pc);
}
}
}
if !target.groups.is_empty() {
let want: HashSet<&str> = target.groups.iter().map(String::as_str).collect();
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let alive: HashSet<String> =
sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ?")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("alive list for group resolve")?
.into_iter()
.filter_map(|r| r.try_get::<String, _>("pc_id").ok())
.collect();
if let Ok(kv) = state.jetstream.get_key_value(BUCKET_AGENT_GROUPS).await {
if let Ok(keys) = kv.keys().await {
let keys: Vec<String> = keys.try_collect().await.unwrap_or_default();
for k in keys {
if !alive.contains(&k) {
continue;
}
let Ok(Some(bytes)) = kv.get(&k).await else {
continue;
};
let Ok(groups) = serde_json::from_slice::<Vec<String>>(&bytes) else {
continue;
};
if groups.iter().any(|g| want.contains(g.as_str())) {
out.insert(k);
}
}
}
}
}
for pc in &target.pcs {
out.insert(pc.clone());
}
let mut v: Vec<String> = out.into_iter().collect();
v.sort();
Ok(v)
}
async fn disable_schedule(state: &AppState, schedule: &Schedule) -> Result<()> {
let kv = state
.jetstream
.get_key_value(BUCKET_SCHEDULES)
.await
.context("get schedules KV for auto-disable")?;
let mut updated = schedule.clone();
updated.enabled = false;
let body = serde_json::to_vec(&updated).context("serialize schedule")?;
kv.put(&updated.id, body.into())
.await
.context("KV put (auto-disable)")?;
info!(schedule_id = %updated.id, "schedule auto-disabled (lifecycle complete)");
audit::record(
&state.nats,
"scheduler",
"schedule_completed",
Some(&updated.id),
serde_json::json!({
"mode": format!("{:?}", schedule.mode),
"job_id": schedule.job_id,
}),
)
.await;
Ok(())
}
async fn unregister(sched: &JobScheduler, registered: &Registered, schedule_id: &str) {
let removed = registered.lock().await.remove(schedule_id);
if let Some(uuid) = removed {
if let Err(e) = sched.remove(&uuid).await {
warn!(error = %e, schedule_id, "scheduler.remove failed");
} else {
info!(schedule_id, "scheduler unregistered");
}
}
}