pub mod policy;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use anyhow::{Context, Result};
use async_nats::jetstream::kv::Operation;
use chrono::{DateTime, Duration as ChronoDuration, Local, Utc};
use futures::{StreamExt, TryStreamExt};
use kanade_shared::kv::{
BUCKET_AGENT_GROUPS, BUCKET_SCHEDULER_DISPATCH, BUCKET_SCHEDULES, dispatch_mark_pc_key,
dispatch_mark_target_key,
};
use kanade_shared::manifest::{
ExecMode, FanoutPlan, Manifest, RunsOn, Schedule, ScheduleTz, Target, When,
};
use sqlx::Row;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::exec::exec_manifest;
use policy::{Completion, FireAction, decide_fire, suppress_dispatched};
const ALIVE_THRESHOLD: ChronoDuration = ChronoDuration::minutes(2);
const DISPATCH_DRAIN_MARGIN: ChronoDuration = ChronoDuration::seconds(90);
const DISPATCH_WINDOW_MIN: ChronoDuration = ChronoDuration::seconds(90);
const DISPATCH_WINDOW_MAX: ChronoDuration = ChronoDuration::minutes(30);
const DISPATCH_MARK_TTL: StdDuration = StdDuration::from_secs(60 * 60);
const DISPATCH_KV_CONCURRENCY: usize = 16;
type Registered = Arc<Mutex<HashMap<String, Uuid>>>;
pub async fn run(state: AppState) -> Result<()> {
let kv = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.context("ensure schedules KV")?;
if let Err(e) = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULER_DISPATCH.into(),
history: 1,
max_age: DISPATCH_MARK_TTL,
..Default::default()
})
.await
{
warn!(error = %e, "ensure scheduler_dispatch KV failed (benign if the bucket already exists with a prior config; a genuine failure falls back to completion-only dedup)");
}
let sched = JobScheduler::new().await.context("init JobScheduler")?;
sched.start().await.context("start JobScheduler")?;
let registered: Registered = Arc::new(Mutex::new(HashMap::new()));
let keys: Vec<String> = match kv.keys().await {
Ok(stream) => stream.try_collect().await.unwrap_or_else(|e| {
warn!(error = %e, "collect schedules KV keys (initial load best-effort)");
Vec::new()
}),
Err(e) => {
warn!(error = %e, "list schedules KV keys (likely empty bucket; watch loop still arms)");
Vec::new()
}
};
for k in keys {
let entry = match kv.get(&k).await {
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(error = %e, key = %k, "kv get");
continue;
}
};
match serde_json::from_slice::<Schedule>(&entry) {
Ok(s) if s.enabled => {
if let Err(e) = register(&sched, state.clone(), ®istered, s.clone()).await {
warn!(error = %e, schedule_id = %s.id, "initial register failed");
}
}
Ok(s) => info!(schedule_id = %s.id, "skipped (disabled)"),
Err(e) => warn!(error = %e, key = %k, "deserialize Schedule"),
}
}
let initial_count = registered.lock().await.len();
info!(
count = initial_count,
"scheduler registered initial schedules"
);
let mut watcher = kv.watch_all().await.context("kv watch_all")?;
while let Some(entry) = watcher.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "watch entry error");
continue;
}
};
match entry.operation {
Operation::Put => {
let sched_data: Schedule = match serde_json::from_slice(&entry.value) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, key = %entry.key, "deserialize Schedule on watch");
continue;
}
};
unregister(&sched, ®istered, &sched_data.id).await;
if sched_data.enabled
&& let Err(e) =
register(&sched, state.clone(), ®istered, sched_data.clone()).await
{
warn!(error = %e, schedule_id = %sched_data.id, "watch register failed");
}
}
Operation::Delete | Operation::Purge => {
unregister(&sched, ®istered, &entry.key).await;
}
}
}
std::future::pending::<Result<()>>().await
}
async fn register(
sched: &JobScheduler,
state: AppState,
registered: &Registered,
schedule: Schedule,
) -> Result<()> {
if matches!(schedule.runs_on, RunsOn::Agent) {
info!(
schedule_id = %schedule.id,
"skipped (runs_on: agent — agents tick this schedule themselves)",
);
return Ok(());
}
let lowered = schedule.lowered();
let cron = lowered.cron;
let schedule_snapshot = schedule.clone();
let cb = move |_uuid, _l| {
let state = state.clone();
let schedule = schedule_snapshot.clone();
Box::pin(async move {
tick(&state, schedule).await;
}) as std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
};
let job = match lowered.tz {
ScheduleTz::Utc => Job::new_async_tz(cron.as_str(), Utc, cb),
ScheduleTz::Local => Job::new_async_tz(cron.as_str(), Local, cb),
}
.with_context(|| format!("Job::new_async_tz (cron={cron}, tz={:?})", lowered.tz))?;
let uuid = sched.add(job).await.context("scheduler.add")?;
registered.lock().await.insert(schedule.id.clone(), uuid);
info!(
schedule_id = %schedule.id,
when = %schedule.when,
poll_cron = %cron,
tz = ?lowered.tz,
"scheduled",
);
if let When::Calendar(c) = &schedule.when {
if let Some(fires_at) = c.oneshot_instant(schedule.tz) {
if fires_at < Utc::now() {
warn!(
schedule_id = %schedule.id,
%fires_at,
"calendar one-shot date is in the past — it will never fire",
);
}
}
}
if let Some(err) = schedule.bad_window() {
warn!(
schedule_id = %schedule.id,
%err,
"constraints.window is unparseable — schedule blocked (fail-closed) until fixed",
);
}
if schedule.calendar_outside_window() {
warn!(
schedule_id = %schedule.id,
when = %schedule.when,
"calendar fire time is outside constraints.window — it will never fire",
);
}
Ok(())
}
async fn tick(state: &AppState, schedule: Schedule) {
let schedule_id = schedule.id.clone();
let job_id = schedule.job_id.clone();
let lowered = schedule.lowered();
if !schedule.active.contains(Utc::now(), schedule.tz) {
tracing::debug!(%schedule_id, "scheduler tick: outside active window (dormant)");
return;
}
if !schedule.constraints.allows(Utc::now(), schedule.tz) {
tracing::debug!(%schedule_id, "scheduler tick: outside maintenance window — skip");
return;
}
let manifest = match crate::api::jobs::fetch(&state.jetstream, &job_id).await {
Ok(Some(m)) => m,
Ok(None) => {
warn!(
%schedule_id, %job_id,
"scheduler fire skipped: job not registered in catalog",
);
return;
}
Err(e) => {
warn!(%schedule_id, %job_id, error = %e, "scheduler fire failed: catalog lookup error");
return;
}
};
let now = Utc::now();
let deadline_at = match parse_starting_deadline(schedule.starting_deadline.as_deref(), now) {
Ok(v) => v,
Err(e) => {
warn!(
%schedule_id, error = %e,
"scheduler fire failed: invalid starting_deadline",
);
return;
}
};
let plan_for_dispatch = || {
let mut p = schedule.plan.clone();
p.deadline_at = deadline_at;
p
};
if matches!(lowered.mode, ExecMode::EveryTick) {
dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
"EveryTick",
)
.await;
return;
}
let expected = match resolve_expected_pcs(state, &schedule.plan.target).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: target resolve");
return;
}
};
let completions = match recent_completions(state, &job_id).await {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = ?e, "scheduler fire failed: completion lookup");
return;
}
};
let cooldown = match parse_cooldown(lowered.cooldown.as_deref()) {
Ok(v) => v,
Err(e) => {
warn!(%schedule_id, error = %e, "scheduler fire failed: invalid when.every");
return;
}
};
let action = decide_fire(lowered.mode, cooldown, &expected, &completions, now);
let window = suppress_window(&schedule, &manifest);
let action = match action {
FireAction::Skip => FireAction::Skip,
FireAction::FireWholeTarget => {
let target_mark = read_target_dispatch_mark(state, &schedule_id).await;
suppress_dispatched(
FireAction::FireWholeTarget,
&HashMap::new(),
target_mark,
window,
now,
)
}
FireAction::FirePcs(pcs) => {
let marks = read_pc_dispatch_marks(state, &schedule_id, &pcs).await;
suppress_dispatched(FireAction::FirePcs(pcs), &marks, None, window, now)
}
};
match action {
FireAction::Skip => {
tracing::debug!(
%schedule_id, when = %schedule.when,
expected = expected.len(),
completions = completions.len(),
"scheduler tick: dedup/in-flight says skip",
);
}
FireAction::FireWholeTarget => {
if dispatch(
state,
&schedule_id,
manifest,
plan_for_dispatch(),
"OncePerTarget armed",
)
.await
{
record_target_dispatch_mark(state, &schedule_id, now).await;
}
}
FireAction::FirePcs(pc_ids) => {
let mut plan = plan_for_dispatch();
plan.target = Target {
pcs: pc_ids.clone(),
..Target::default()
};
plan.rollout = None;
info!(
%schedule_id, pcs = pc_ids.len(),
"OncePerPc: firing at remaining pcs",
);
if dispatch(state, &schedule_id, manifest, plan, "OncePerPc subset").await {
record_pc_dispatch_marks(state, &schedule_id, &pc_ids, now).await;
}
}
}
}
async fn dispatch(
state: &AppState,
schedule_id: &str,
manifest: Manifest,
plan: FanoutPlan,
why: &str,
) -> bool {
match exec_manifest(state, manifest, plan, "scheduler", None).await {
Ok(resp) => {
info!(
%schedule_id, exec_id = %resp.exec_id, why,
"scheduler exec ok",
);
true
}
Err((status, msg)) => {
warn!(
%schedule_id, status = %status, error = %msg, why,
"scheduler exec failed",
);
false
}
}
}
fn suppress_window(schedule: &Schedule, manifest: &Manifest) -> ChronoDuration {
let parse = |s: &str| {
humantime::parse_duration(s)
.ok()
.and_then(|d| ChronoDuration::from_std(d).ok())
};
let jitter = schedule
.plan
.jitter
.as_deref()
.and_then(parse)
.unwrap_or_else(ChronoDuration::zero);
let timeout = parse(&manifest.execute.timeout).unwrap_or_else(|| {
warn!(
job_id = %manifest.id,
raw = %manifest.execute.timeout,
"suppress_window: unparseable timeout; treating as zero",
);
ChronoDuration::zero()
});
jitter
.checked_add(&timeout)
.and_then(|d| d.checked_add(&DISPATCH_DRAIN_MARGIN))
.map(|d| d.clamp(DISPATCH_WINDOW_MIN, DISPATCH_WINDOW_MAX))
.unwrap_or(DISPATCH_WINDOW_MAX)
}
fn parse_dispatch_mark(bytes: &[u8]) -> Option<DateTime<Utc>> {
let s = std::str::from_utf8(bytes).ok()?;
DateTime::parse_from_rfc3339(s.trim())
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
async fn read_pc_dispatch_marks(
state: &AppState,
schedule_id: &str,
pcs: &[String],
) -> HashMap<String, DateTime<Utc>> {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
return HashMap::new();
};
futures::stream::iter(pcs.iter().cloned())
.map(|pc| {
let kv = kv.clone();
let key = dispatch_mark_pc_key(schedule_id, &pc);
async move {
let ts = match kv.get(&key).await {
Ok(Some(bytes)) => parse_dispatch_mark(&bytes),
_ => None,
};
(pc, ts)
}
})
.buffer_unordered(DISPATCH_KV_CONCURRENCY)
.filter_map(|(pc, ts)| async move { ts.map(|t| (pc, t)) })
.collect()
.await
}
async fn read_target_dispatch_mark(state: &AppState, schedule_id: &str) -> Option<DateTime<Utc>> {
let kv = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
.ok()?;
let bytes = kv
.get(&dispatch_mark_target_key(schedule_id))
.await
.ok()??;
parse_dispatch_mark(&bytes)
}
async fn record_pc_dispatch_marks(
state: &AppState,
schedule_id: &str,
pcs: &[String],
at: DateTime<Utc>,
) {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
warn!(%schedule_id, "record dispatch marks: scheduler_dispatch KV unavailable");
return;
};
let val = at.to_rfc3339();
futures::stream::iter(pcs.iter().cloned())
.for_each_concurrent(DISPATCH_KV_CONCURRENCY, |pc| {
let kv = kv.clone();
let key = dispatch_mark_pc_key(schedule_id, &pc);
let val = val.clone();
async move {
if let Err(e) = kv.put(&key, val.into_bytes().into()).await {
warn!(%schedule_id, pc, error = %e, "record dispatch mark failed");
}
}
})
.await;
}
async fn record_target_dispatch_mark(state: &AppState, schedule_id: &str, at: DateTime<Utc>) {
let Ok(kv) = state
.jetstream
.get_key_value(BUCKET_SCHEDULER_DISPATCH)
.await
else {
warn!(%schedule_id, "record target dispatch mark: scheduler_dispatch KV unavailable");
return;
};
let key = dispatch_mark_target_key(schedule_id);
if let Err(e) = kv.put(&key, at.to_rfc3339().into_bytes().into()).await {
warn!(%schedule_id, error = %e, "record target dispatch mark failed");
}
}
fn parse_cooldown(s: Option<&str>) -> Result<Option<ChronoDuration>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse cooldown '{raw}'"))?;
Ok(Some(
ChronoDuration::from_std(std).context("cooldown overflow")?,
))
}
}
}
fn parse_starting_deadline(
s: Option<&str>,
now: chrono::DateTime<Utc>,
) -> Result<Option<chrono::DateTime<Utc>>> {
match s {
None => Ok(None),
Some(raw) => {
let std: StdDuration = humantime::parse_duration(raw)
.with_context(|| format!("parse starting_deadline '{raw}'"))?;
let d = ChronoDuration::from_std(std).context("starting_deadline overflow")?;
Ok(Some(now + d))
}
}
}
async fn recent_completions(state: &AppState, job_id: &str) -> Result<Vec<Completion>> {
let rows = sqlx::query(
"SELECT pc_id, MAX(finished_at) AS finished_at
FROM execution_results
WHERE job_id = ? AND exit_code = 0
GROUP BY pc_id",
)
.bind(job_id)
.fetch_all(&state.pool)
.await
.context("execution_results dedup query")?;
let mut out = Vec::with_capacity(rows.len());
for r in rows {
let pc_id: String = r.try_get("pc_id").unwrap_or_default();
let finished_at: chrono::DateTime<Utc> = match r.try_get("finished_at") {
Ok(t) => t,
Err(_) => continue,
};
if !pc_id.is_empty() {
out.push(Completion { pc_id, finished_at });
}
}
Ok(out)
}
async fn resolve_expected_pcs(state: &AppState, target: &Target) -> Result<Vec<String>> {
let mut out: HashSet<String> = HashSet::new();
if target.all {
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let rows = sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ? ORDER BY pc_id")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("agents alive query")?;
for r in rows {
if let Ok(pc) = r.try_get::<String, _>("pc_id") {
out.insert(pc);
}
}
}
if !target.groups.is_empty() {
let want: HashSet<&str> = target.groups.iter().map(String::as_str).collect();
let cutoff = Utc::now() - ALIVE_THRESHOLD;
let alive: HashSet<String> =
sqlx::query("SELECT pc_id FROM agents WHERE last_heartbeat >= ?")
.bind(cutoff)
.fetch_all(&state.pool)
.await
.context("alive list for group resolve")?
.into_iter()
.filter_map(|r| r.try_get::<String, _>("pc_id").ok())
.collect();
if let Ok(kv) = state.jetstream.get_key_value(BUCKET_AGENT_GROUPS).await {
if let Ok(keys) = kv.keys().await {
let keys: Vec<String> = keys.try_collect().await.unwrap_or_default();
for k in keys {
if !alive.contains(&k) {
continue;
}
let Ok(Some(bytes)) = kv.get(&k).await else {
continue;
};
let Ok(groups) = serde_json::from_slice::<Vec<String>>(&bytes) else {
continue;
};
if groups.iter().any(|g| want.contains(g.as_str())) {
out.insert(k);
}
}
}
}
}
for pc in &target.pcs {
out.insert(pc.clone());
}
let mut v: Vec<String> = out.into_iter().collect();
v.sort();
Ok(v)
}
async fn unregister(sched: &JobScheduler, registered: &Registered, schedule_id: &str) {
let removed = registered.lock().await.remove(schedule_id);
if let Some(uuid) = removed {
if let Err(e) = sched.remove(&uuid).await {
warn!(error = %e, schedule_id, "scheduler.remove failed");
} else {
info!(schedule_id, "scheduler unregistered");
}
}
}