use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Context, Result};
use async_nats::jetstream::kv::Operation;
use futures::{StreamExt, TryStreamExt};
use kanade_shared::kv::BUCKET_SCHEDULES;
use kanade_shared::manifest::Schedule;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn};
use uuid::Uuid;
use crate::api::AppState;
use crate::api::deploy::deploy_manifest;
type Registered = Arc<Mutex<HashMap<String, Uuid>>>;
pub async fn run(state: AppState) -> Result<()> {
let kv = state
.jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_SCHEDULES.into(),
history: 5,
..Default::default()
})
.await
.context("ensure schedules KV")?;
let sched = JobScheduler::new().await.context("init JobScheduler")?;
sched.start().await.context("start JobScheduler")?;
let registered: Registered = Arc::new(Mutex::new(HashMap::new()));
let keys: Vec<String> = match kv.keys().await {
Ok(stream) => stream.try_collect().await.unwrap_or_else(|e| {
warn!(error = %e, "collect schedules KV keys (initial load best-effort)");
Vec::new()
}),
Err(e) => {
warn!(error = %e, "list schedules KV keys (likely empty bucket; watch loop still arms)");
Vec::new()
}
};
for k in keys {
let entry = match kv.get(&k).await {
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(error = %e, key = %k, "kv get");
continue;
}
};
match serde_json::from_slice::<Schedule>(&entry) {
Ok(s) if s.enabled => {
if let Err(e) = register(&sched, state.clone(), ®istered, s.clone()).await {
warn!(error = %e, schedule_id = %s.id, "initial register failed");
}
}
Ok(s) => info!(schedule_id = %s.id, "skipped (disabled)"),
Err(e) => warn!(error = %e, key = %k, "deserialize Schedule"),
}
}
let initial_count = registered.lock().await.len();
info!(
count = initial_count,
"scheduler registered initial schedules"
);
let mut watcher = kv.watch_all().await.context("kv watch_all")?;
while let Some(entry) = watcher.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "watch entry error");
continue;
}
};
match entry.operation {
Operation::Put => {
let sched_data: Schedule = match serde_json::from_slice(&entry.value) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, key = %entry.key, "deserialize Schedule on watch");
continue;
}
};
unregister(&sched, ®istered, &sched_data.id).await;
if sched_data.enabled
&& let Err(e) =
register(&sched, state.clone(), ®istered, sched_data.clone()).await
{
warn!(error = %e, schedule_id = %sched_data.id, "watch register failed");
}
}
Operation::Delete | Operation::Purge => {
unregister(&sched, ®istered, &entry.key).await;
}
}
}
std::future::pending::<Result<()>>().await
}
async fn register(
sched: &JobScheduler,
state: AppState,
registered: &Registered,
schedule: Schedule,
) -> Result<()> {
let cron = schedule.cron.clone();
let schedule_id = schedule.id.clone();
let manifest = schedule.manifest.clone();
let job = Job::new_async(cron.as_str(), move |_uuid, _l| {
let state = state.clone();
let manifest = manifest.clone();
let schedule_id = schedule_id.clone();
Box::pin(async move {
info!(
schedule_id = %schedule_id,
job_id = %manifest.id,
"scheduler firing",
);
match deploy_manifest(&state, manifest, "scheduler").await {
Ok(resp) => info!(
schedule_id = %schedule_id,
deploy_id = %resp.deploy_id,
"scheduler deploy ok",
),
Err((status, msg)) => warn!(
schedule_id = %schedule_id,
status = %status,
error = %msg,
"scheduler deploy failed",
),
}
})
})
.with_context(|| format!("Job::new_async (cron={cron})"))?;
let uuid = sched.add(job).await.context("scheduler.add")?;
registered.lock().await.insert(schedule.id.clone(), uuid);
info!(schedule_id = %schedule.id, cron = %schedule.cron, "scheduled");
Ok(())
}
async fn unregister(sched: &JobScheduler, registered: &Registered, schedule_id: &str) {
let removed = registered.lock().await.remove(schedule_id);
if let Some(uuid) = removed {
if let Err(e) = sched.remove(&uuid).await {
warn!(error = %e, schedule_id, "scheduler.remove failed");
} else {
info!(schedule_id, "scheduler unregistered");
}
}
}