use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use async_nats::jetstream::kv::Operation;
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use futures::{StreamExt, TryStreamExt};
use kanade_shared::kv::{
BUCKET_JOBS, BUCKET_SCHEDULES, BUCKET_SCRIPT_CURRENT, BUCKET_SCRIPT_STATUS,
};
use kanade_shared::manifest::{ExecMode, Manifest, RunsOn, Schedule};
use kanade_shared::wire::Command;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::commands::handle_command;
use crate::nats_retry;
struct State {
jobs: HashMap<String, Manifest>,
registered: HashMap<String, Uuid>,
schedules: HashMap<String, Schedule>,
completions: HashMap<String, DateTime<Utc>>,
completions_path: PathBuf,
}
impl State {
fn matching(&self, schedule: &Schedule, pc_id: &str, my_groups: &[String]) -> bool {
matches!(schedule.runs_on, RunsOn::Agent)
&& schedule.enabled
&& target_includes(schedule, pc_id, my_groups)
}
fn key(schedule_id: &str, job_id: &str) -> String {
format!("{schedule_id}::{job_id}")
}
fn record_completion(&mut self, schedule_id: &str, job_id: &str, when: DateTime<Utc>) {
self.completions
.insert(Self::key(schedule_id, job_id), when);
if let Err(e) = self.flush_completions() {
warn!(
error = %e,
"local_completions.json flush failed; in-memory state still consistent",
);
}
}
fn flush_completions(&self) -> Result<()> {
let tmp = self.completions_path.with_extension("json.tmp");
let bytes =
serde_json::to_vec_pretty(&self.completions).context("serialise local_completions")?;
if let Some(parent) = tmp.parent() {
std::fs::create_dir_all(parent).ok();
}
std::fs::write(&tmp, &bytes).context("write tmp completions file")?;
std::fs::rename(&tmp, &self.completions_path).context("rename tmp → final")?;
Ok(())
}
fn load_completions(path: &std::path::Path) -> HashMap<String, DateTime<Utc>> {
match std::fs::read(path) {
Ok(bytes) => match serde_json::from_slice(&bytes) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, path = %path.display(), "parse local_completions; starting empty");
HashMap::new()
}
},
Err(e) if e.kind() == std::io::ErrorKind::NotFound => HashMap::new(),
Err(e) => {
warn!(error = %e, path = %path.display(), "read local_completions; starting empty");
HashMap::new()
}
}
}
}
fn target_includes(schedule: &Schedule, pc_id: &str, my_groups: &[String]) -> bool {
let t = &schedule.plan.target;
if t.all {
return true;
}
if t.pcs.iter().any(|p| p == pc_id) {
return true;
}
if t.groups.iter().any(|g| my_groups.iter().any(|m| m == g)) {
return true;
}
false
}
pub fn spawn(
client: async_nats::Client,
pc_id: String,
completions_path: PathBuf,
groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
staleness: crate::staleness::Tracker,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
run(client, pc_id, completions_path, groups_rx, staleness).await;
})
}
async fn run(
client: async_nats::Client,
pc_id: String,
completions_path: PathBuf,
groups_rx: tokio::sync::watch::Receiver<Vec<String>>,
staleness: crate::staleness::Tracker,
) {
let js = async_nats::jetstream::new(client.clone());
let internal = match JobScheduler::new().await {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "local_scheduler: JobScheduler::new failed; aborting subsystem");
return;
}
};
if let Err(e) = internal.start().await {
warn!(error = %e, "local_scheduler: JobScheduler::start failed; aborting subsystem");
return;
}
let completions = State::load_completions(&completions_path);
info!(
path = %completions_path.display(),
loaded = completions.len(),
"local_scheduler: loaded completion state",
);
let state = Arc::new(Mutex::new(State {
jobs: HashMap::new(),
registered: HashMap::new(),
schedules: HashMap::new(),
completions,
completions_path,
}));
let _groups_task = spawn_groups_change_task(
client.clone(),
pc_id.clone(),
staleness.clone(),
groups_rx.clone(),
internal.clone(),
state.clone(),
);
loop {
let schedules_kv = nats_retry::wait_for_kv(
&js,
&client,
&staleness,
BUCKET_SCHEDULES,
"local_scheduler",
)
.await;
let jobs_kv =
nats_retry::wait_for_kv(&js, &client, &staleness, BUCKET_JOBS, "local_scheduler").await;
let new_jobs = match collect_jobs(&jobs_kv).await {
Ok(j) => j,
Err(()) => {
warn!("local_scheduler: jobs KV walk failed; keeping previous state and reopening");
nats_retry::reopen_pause().await;
continue;
}
};
let new_schedules = match collect_schedules(&schedules_kv).await {
Ok(s) => s,
Err(()) => {
warn!(
"local_scheduler: schedules KV walk failed; keeping previous state and reopening"
);
nats_retry::reopen_pause().await;
continue;
}
};
let my_groups = groups_rx.borrow().clone();
info!(
pc_id = %pc_id,
groups = ?my_groups,
jobs = new_jobs.len(),
schedules = new_schedules.len(),
"local_scheduler: applying resync",
);
apply_resync(
&internal,
&state,
&client,
&pc_id,
&my_groups,
&staleness,
new_jobs,
new_schedules,
)
.await;
let count = state.lock().await.registered.len();
info!(count, "local_scheduler: registered schedules after resync");
let mut schedules_watch = match schedules_kv.watch_all().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "schedules KV watch_all failed; reopening");
nats_retry::reopen_pause().await;
continue;
}
};
let mut jobs_watch = match jobs_kv.watch_all().await {
Ok(w) => w,
Err(e) => {
warn!(error = %e, "jobs KV watch_all failed; reopening");
nats_retry::reopen_pause().await;
continue;
}
};
let dropped = 'inner: loop {
tokio::select! {
entry = schedules_watch.next() => {
let Some(entry) = entry else { break 'inner "schedules" };
let entry = match entry {
Ok(e) => e,
Err(e) => { warn!(error = %e, "schedules watch error"); continue; }
};
let groups_snapshot = groups_rx.borrow().clone();
match entry.operation {
Operation::Put => {
if let Ok(s) = serde_json::from_slice::<Schedule>(&entry.value) {
reconcile_schedule(
&internal, &state, &client, &pc_id, &groups_snapshot, &s, &staleness,
)
.await;
} else {
warn!(key = %entry.key, "deserialize Schedule on watch");
}
}
Operation::Delete | Operation::Purge => {
unregister_locally(&internal, &state, &entry.key).await;
}
}
}
entry = jobs_watch.next() => {
let Some(entry) = entry else { break 'inner "jobs" };
let entry = match entry {
Ok(e) => e,
Err(e) => { warn!(error = %e, "jobs watch error"); continue; }
};
let mut s = state.lock().await;
match entry.operation {
Operation::Put => {
if let Ok(m) = serde_json::from_slice::<Manifest>(&entry.value) {
s.jobs.insert(entry.key.clone(), m);
debug!(job_id = %entry.key, "local_scheduler: cached job manifest");
}
}
Operation::Delete | Operation::Purge => {
s.jobs.remove(&entry.key);
}
}
}
}
};
warn!(dropped, "local_scheduler watch ended; reopening");
nats_retry::reopen_pause().await;
}
}
async fn collect_jobs(
jobs_kv: &async_nats::jetstream::kv::Store,
) -> Result<HashMap<String, Manifest>, ()> {
let keys = match jobs_kv.keys().await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "local_scheduler: jobs_kv.keys() failed");
return Err(());
}
};
let keys: Vec<String> = keys.try_collect().await.unwrap_or_default();
let mut out = HashMap::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = jobs_kv.get(&k).await
&& let Ok(m) = serde_json::from_slice::<Manifest>(&bytes)
{
out.insert(k, m);
}
}
Ok(out)
}
async fn collect_schedules(
schedules_kv: &async_nats::jetstream::kv::Store,
) -> Result<Vec<Schedule>, ()> {
let keys = match schedules_kv.keys().await {
Ok(k) => k,
Err(e) => {
warn!(error = %e, "local_scheduler: schedules_kv.keys() failed");
return Err(());
}
};
let keys: Vec<String> = keys.try_collect().await.unwrap_or_default();
let mut out = Vec::with_capacity(keys.len());
for k in keys {
if let Ok(Some(bytes)) = schedules_kv.get(&k).await
&& let Ok(s) = serde_json::from_slice::<Schedule>(&bytes)
{
out.push(s);
}
}
Ok(out)
}
#[allow(clippy::too_many_arguments)]
async fn apply_resync(
internal: &JobScheduler,
state: &Arc<Mutex<State>>,
client: &async_nats::Client,
pc_id: &str,
my_groups: &[String],
staleness: &crate::staleness::Tracker,
new_jobs: HashMap<String, Manifest>,
new_schedules: Vec<Schedule>,
) {
{
let mut st = state.lock().await;
st.jobs = new_jobs;
}
let new_ids: std::collections::HashSet<String> =
new_schedules.iter().map(|s| s.id.clone()).collect();
let stale_ids: Vec<String> = {
let st = state.lock().await;
st.schedules
.keys()
.filter(|id| !new_ids.contains(*id))
.cloned()
.collect()
};
for id in stale_ids {
unregister_locally(internal, state, &id).await;
}
for s in &new_schedules {
reconcile_schedule(internal, state, client, pc_id, my_groups, s, staleness).await;
}
}
fn spawn_groups_change_task(
client: async_nats::Client,
pc_id: String,
staleness: crate::staleness::Tracker,
mut groups_rx_for_watch: tokio::sync::watch::Receiver<Vec<String>>,
internal: JobScheduler,
state: Arc<Mutex<State>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let js = async_nats::jetstream::new(client.clone());
loop {
if groups_rx_for_watch.changed().await.is_err() {
break;
}
let new_groups = groups_rx_for_watch.borrow().clone();
info!(
groups = ?new_groups,
"local_scheduler: group membership changed; re-reconciling all schedules",
);
let kv = nats_retry::wait_for_kv(
&js,
&client,
&staleness,
BUCKET_SCHEDULES,
"local_scheduler_groups",
)
.await;
let new_schedules = match collect_schedules(&kv).await {
Ok(s) => s,
Err(()) => {
warn!(
"local_scheduler: groups change resync — schedules walk failed; skipping iteration"
);
continue;
}
};
let new_ids: std::collections::HashSet<String> =
new_schedules.iter().map(|s| s.id.clone()).collect();
let stale_ids: Vec<String> = {
let st = state.lock().await;
st.schedules
.keys()
.filter(|id| !new_ids.contains(*id))
.cloned()
.collect()
};
for id in stale_ids {
unregister_locally(&internal, &state, &id).await;
}
for s in &new_schedules {
reconcile_schedule(
&internal,
&state,
&client,
&pc_id,
&new_groups,
s,
&staleness,
)
.await;
}
}
})
}
async fn reconcile_schedule(
internal: &JobScheduler,
state: &Arc<Mutex<State>>,
client: &async_nats::Client,
pc_id: &str,
my_groups: &[String],
schedule: &Schedule,
staleness: &crate::staleness::Tracker,
) {
let mut st = state.lock().await;
let mine = st.matching(schedule, pc_id, my_groups);
if let Some(uuid) = st.registered.remove(&schedule.id) {
st.schedules.remove(&schedule.id);
if let Err(e) = internal.remove(&uuid).await {
warn!(error = %e, schedule_id = %schedule.id, "local_scheduler: remove failed");
} else {
info!(schedule_id = %schedule.id, "local_scheduler: unregistered");
}
}
if !mine {
return;
}
let cron = schedule.cron.clone();
let schedule_id = schedule.id.clone();
let client_for_job = client.clone();
let pc_id_for_job = pc_id.to_string();
let state_for_job = state.clone();
let schedule_for_job = schedule.clone();
let staleness_for_job = staleness.clone();
let job = match Job::new_async(cron.as_str(), move |_uuid, _l| {
let client = client_for_job.clone();
let pc_id = pc_id_for_job.clone();
let state = state_for_job.clone();
let schedule = schedule_for_job.clone();
let staleness = staleness_for_job.clone();
Box::pin(async move {
local_tick(&client, &pc_id, &state, &schedule, &staleness).await;
})
}) {
Ok(j) => j,
Err(e) => {
warn!(
schedule_id = %schedule.id,
error = %e,
"local_scheduler: Job::new_async failed",
);
return;
}
};
let job_uuid = match internal.add(job).await {
Ok(u) => u,
Err(e) => {
warn!(
schedule_id = %schedule.id,
error = %e,
"local_scheduler: internal.add failed",
);
return;
}
};
st.schedules.insert(schedule.id.clone(), schedule.clone());
st.registered.insert(schedule.id.clone(), job_uuid);
info!(
schedule_id = %schedule_id,
cron = %cron,
mode = ?schedule.mode,
"local_scheduler: registered",
);
}
async fn unregister_locally(internal: &JobScheduler, state: &Arc<Mutex<State>>, schedule_id: &str) {
let uuid_opt = {
let mut st = state.lock().await;
st.schedules.remove(schedule_id);
st.registered.remove(schedule_id)
};
if let Some(uuid) = uuid_opt {
if let Err(e) = internal.remove(&uuid).await {
warn!(error = %e, schedule_id, "local_scheduler: remove failed");
} else {
info!(schedule_id, "local_scheduler: unregistered");
}
}
}
async fn local_tick(
client: &async_nats::Client,
pc_id: &str,
state: &Arc<Mutex<State>>,
schedule: &Schedule,
staleness: &crate::staleness::Tracker,
) {
let manifest = {
let st = state.lock().await;
match st.jobs.get(&schedule.job_id).cloned() {
Some(m) => m,
None => {
warn!(
schedule_id = %schedule.id,
job_id = %schedule.job_id,
"local_scheduler: job not in cache yet — skip this tick",
);
return;
}
}
};
let now = Utc::now();
let cooldown = schedule
.cooldown
.as_deref()
.and_then(|s| humantime::parse_duration(s).ok())
.and_then(|d| ChronoDuration::from_std(d).ok());
let should_fire = match schedule.mode {
ExecMode::EveryTick => true,
ExecMode::OncePerPc | ExecMode::OncePerTarget => {
let st = state.lock().await;
let key = State::key(&schedule.id, &schedule.job_id);
match st.completions.get(&key) {
None => true,
Some(last) => match cooldown {
None => false, Some(cd) => (now - *last) >= cd,
},
}
}
};
if !should_fire {
debug!(
schedule_id = %schedule.id,
"local_scheduler: dedup says skip",
);
return;
}
let timeout_secs = humantime::parse_duration(&manifest.execute.timeout)
.ok()
.map(|d| d.as_secs())
.unwrap_or(60);
let jitter_secs = schedule
.plan
.jitter
.as_deref()
.and_then(|s| humantime::parse_duration(s).ok())
.map(|d| d.as_secs());
let exec_id = Uuid::new_v4().to_string();
let cmd = Command {
id: manifest.id.clone(),
version: manifest.version.clone(),
request_id: Uuid::new_v4().to_string(),
exec_id: Some(exec_id),
shell: manifest.execute.shell.into(),
script: manifest.execute.script.clone(),
timeout_secs,
jitter_secs,
run_as: manifest.execute.run_as,
cwd: manifest.execute.cwd.clone(),
deadline_at: None,
staleness: manifest.staleness.clone(),
};
let js = async_nats::jetstream::new(client.clone());
let script_current = js.get_key_value(BUCKET_SCRIPT_CURRENT).await.ok();
let script_status = js.get_key_value(BUCKET_SCRIPT_STATUS).await.ok();
info!(
schedule_id = %schedule.id,
job_id = %manifest.id,
request_id = %cmd.request_id,
"local_scheduler: firing (runs_on: agent)",
);
let request_id = cmd.request_id.clone();
let job_id_for_completion = manifest.id.clone();
match handle_command(
client.clone(),
pc_id.to_string(),
cmd,
script_current,
script_status,
staleness.clone(),
)
.await
{
Ok(()) => {
state
.lock()
.await
.record_completion(&schedule.id, &job_id_for_completion, Utc::now());
info!(
schedule_id = %schedule.id,
%request_id,
"local_scheduler: completion recorded",
);
}
Err(e) => {
warn!(
schedule_id = %schedule.id,
%request_id,
error = %e,
"local_scheduler: handle_command failed (will retry next tick)",
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use kanade_shared::manifest::{FanoutPlan, Target};
fn schedule(target: Target, runs_on: RunsOn) -> Schedule {
Schedule {
id: "s".into(),
cron: "* * * * * *".into(),
job_id: "j".into(),
plan: FanoutPlan {
target,
..Default::default()
},
mode: ExecMode::EveryTick,
cooldown: None,
auto_disable_when_done: false,
starting_deadline: None,
runs_on,
enabled: true,
}
}
#[test]
fn target_all_matches_anyone() {
let s = schedule(
Target {
all: true,
..Default::default()
},
RunsOn::Agent,
);
assert!(target_includes(&s, "minipc", &[]));
}
#[test]
fn target_pcs_explicit_match() {
let s = schedule(
Target {
pcs: vec!["minipc".into()],
..Default::default()
},
RunsOn::Agent,
);
assert!(target_includes(&s, "minipc", &[]));
assert!(!target_includes(&s, "other", &[]));
}
#[test]
fn target_groups_intersect() {
let s = schedule(
Target {
groups: vec!["canary".into(), "wave1".into()],
..Default::default()
},
RunsOn::Agent,
);
assert!(target_includes(&s, "any", &["wave1".into()]));
assert!(target_includes(
&s,
"any",
&["dept-eng".into(), "canary".into()]
));
assert!(!target_includes(&s, "any", &["dept-eng".into()]));
}
#[test]
fn target_none_matches_none() {
let s = schedule(Target::default(), RunsOn::Agent);
assert!(!target_includes(&s, "minipc", &["canary".into()]));
}
}