use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use tracing::{debug, info, trace, warn};
use crate::control::planner::procedural::executor::bindings::RowBindings;
use crate::control::planner::procedural::executor::core::StatementExecutor;
use crate::control::security::identity::{AuthMethod, AuthenticatedIdentity, Role};
use crate::control::state::SharedState;
use crate::types::TenantId;
use super::cron::CronExpr;
use super::dispatcher::{DispatchOutcome, JobDispatcher, JobDispatcherConfig};
use super::history::JobHistoryStore;
use super::registry::ScheduleRegistry;
use super::types::{JobRun, ScheduleDef, ScheduleScope};
pub fn pending_minute_ticks(
last_fired_minute: Option<u64>,
now_secs: u64,
cron: &CronExpr,
utc_offset_seconds: i32,
) -> Vec<u64> {
let now_min = now_secs / 60;
let start = match last_fired_minute {
Some(last) => last.saturating_add(1),
None => now_min,
};
(start..=now_min)
.filter(|m| cron.matches_epoch_with_offset(m.saturating_mul(60), utc_offset_seconds))
.collect()
}
pub fn spawn_scheduler(
state: Arc<SharedState>,
registry: Arc<ScheduleRegistry>,
history: Arc<JobHistoryStore>,
shutdown: watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
scheduler_loop(state, registry, history, shutdown).await;
})
}
async fn scheduler_loop(
state: Arc<SharedState>,
registry: Arc<ScheduleRegistry>,
history: Arc<JobHistoryStore>,
mut shutdown: watch::Receiver<bool>,
) {
info!("scheduler started");
let running: Arc<std::sync::Mutex<HashSet<(u64, String)>>> =
Arc::new(std::sync::Mutex::new(HashSet::new()));
let sched_tuning = &state.tuning.scheduler;
let dispatcher = Arc::new(JobDispatcher::new(JobDispatcherConfig {
max_concurrent_jobs: sched_tuning.max_concurrent_jobs,
max_result_bytes: u64::MAX,
}));
let job_timeout_secs = sched_tuning.job_timeout_secs;
let mut last_fired_minute: HashMap<(u64, String), u64> = HashMap::new();
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
debug!("scheduler shutting down");
dispatcher.shutdown_and_drain().await;
return;
}
}
}
if *shutdown.borrow() {
dispatcher.shutdown_and_drain().await;
return;
}
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let schedules = registry.list_all_enabled();
if schedules.is_empty() {
continue;
}
for sched in &schedules {
let cron = match CronExpr::parse(&sched.cron_expr) {
Ok(c) => c,
Err(e) => {
warn!(
schedule = %sched.name,
error = %e,
"invalid cron expression, skipping"
);
continue;
}
};
let sched_key = (sched.tenant_id, sched.name.clone());
let last = last_fired_minute.get(&sched_key).copied();
let tz_offset = state.scheduler_config.cron_timezone.offset_seconds();
let pending = pending_minute_ticks(last, now_secs, &cron, tz_offset);
if pending.is_empty() {
if last.is_none() {
last_fired_minute.insert(sched_key.clone(), now_secs / 60);
}
continue;
}
if let Some(&max_min) = pending.iter().max() {
last_fired_minute.insert(sched_key.clone(), max_min);
}
if !should_fire_on_this_node(sched, &state) {
trace!(
schedule = %sched.name,
"skipping: this node is not the leader for target vShard"
);
continue;
}
if !is_raft_group_healthy(sched, &state) {
debug!(
schedule = %sched.name,
"skipping: Raft group lagging (lease-aware suspension)"
);
continue;
}
let key = (sched.tenant_id, sched.name.clone());
if !sched.allow_overlap {
let guard = running.lock().unwrap_or_else(|p| p.into_inner());
if guard.contains(&key) {
trace!(
schedule = %sched.name,
"skipping: previous run still active (ALLOW_OVERLAP = false)"
);
continue;
}
}
{
let mut guard = running.lock().unwrap_or_else(|p| p.into_inner());
guard.insert(key.clone());
}
debug!(schedule = %sched.name, "firing scheduled job");
let state_clone = Arc::clone(&state);
let history_clone = Arc::clone(&history);
let running_clone = Arc::clone(&running);
let sched_clone = sched.clone();
let outcome = dispatcher.try_spawn(move |mut shutdown_rx| async move {
let result = tokio::select! {
r = execute_job(&state_clone, &sched_clone, job_timeout_secs) => r,
_ = shutdown_rx.changed() => {
return Err(crate::Error::Dispatch { detail: "scheduler shutdown".to_string() });
}
};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let run = match result {
Ok(duration_ms) => JobRun {
schedule_name: sched_clone.name.clone(),
tenant_id: sched_clone.tenant_id,
started_at: now_ms.saturating_sub(duration_ms),
duration_ms,
success: true,
error: None,
},
Err(e) => {
warn!(
schedule = %sched_clone.name,
error = %e,
"scheduled job failed"
);
JobRun {
schedule_name: sched_clone.name.clone(),
tenant_id: sched_clone.tenant_id,
started_at: now_ms,
duration_ms: 0,
success: false,
error: Some(e.to_string()),
}
}
};
if let Err(e) = history_clone.record(run) {
warn!(error = %e, "failed to record job history");
}
let key = (sched_clone.tenant_id, sched_clone.name.clone());
let mut guard = running_clone.lock().unwrap_or_else(|p| p.into_inner());
guard.remove(&key);
Ok(())
});
if outcome == DispatchOutcome::OverBudget {
let mut guard = running.lock().unwrap_or_else(|p| p.into_inner());
guard.remove(&key);
warn!(
schedule = %sched.name,
max_concurrent = sched_tuning.max_concurrent_jobs,
"scheduled job rejected: concurrency cap reached"
);
}
}
}
}
fn should_fire_on_this_node(sched: &ScheduleDef, state: &SharedState) -> bool {
if sched.scope == ScheduleScope::Local {
return true;
}
let Some(ref routing_lock) = state.cluster_routing else {
return true;
};
let node_id = state.node_id;
if let Some(ref collection) = sched.target_collection {
let vshard_id = nodedb_cluster::routing::vshard_for_collection(
nodedb_types::id::DatabaseId::DEFAULT,
collection,
);
let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
match routing.leader_for_vshard(vshard_id) {
Ok(leader) => leader == node_id,
Err(_) => {
false
}
}
} else {
let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
match routing.leader_for_vshard(0) {
Ok(coordinator) => coordinator == node_id,
Err(_) => false,
}
}
}
fn is_raft_group_healthy(sched: &ScheduleDef, state: &SharedState) -> bool {
if sched.scope == ScheduleScope::Local {
return true;
}
let Some(ref status_fn) = state.raft_status_fn else {
return true;
};
let Some(ref routing_lock) = state.cluster_routing else {
return true;
};
let vshard_id = sched
.target_collection
.as_ref()
.map(|c| {
nodedb_cluster::routing::vshard_for_collection(nodedb_types::id::DatabaseId::DEFAULT, c)
})
.unwrap_or(0);
let group_id = {
let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
match routing.group_for_vshard(vshard_id) {
Ok(gid) => gid,
Err(_) => return true, }
};
let statuses = status_fn();
let Some(status) = statuses.iter().find(|s| s.group_id == group_id) else {
return true; };
if status.commit_index > status.last_applied {
let lag = status.commit_index - status.last_applied;
debug!(
group_id,
commit_index = status.commit_index,
last_applied = status.last_applied,
lag,
"Raft group lagging — suspending schedule fire"
);
return false;
}
if status.leader_id != state.node_id {
return false;
}
true
}
async fn execute_job(
state: &SharedState,
sched: &ScheduleDef,
job_timeout_secs: u64,
) -> crate::Result<u64> {
use crate::control::planner::procedural::executor::fuel::ExecutionBudget;
let start = std::time::Instant::now();
let identity = scheduler_identity(TenantId::new(sched.tenant_id), &sched.owner);
if let Err(msg) = super::body_guard::validate_scheduled_body(&sched.body_sql) {
return Err(crate::Error::BadRequest {
detail: format!("schedule '{}': {msg}", sched.name),
});
}
let block = crate::control::planner::procedural::parse_block(&sched.body_sql).map_err(|e| {
crate::Error::BadRequest {
detail: format!("schedule '{}' body parse error: {e}", sched.name),
}
})?;
let executor =
StatementExecutor::new(state, identity.clone(), TenantId::new(sched.tenant_id), 0);
let bindings = RowBindings::empty();
let mut budget = ExecutionBudget::new(100_000, job_timeout_secs);
match executor
.execute_block_with_budget(&block, &bindings, &mut budget)
.await
{
Ok(()) => {}
Err(first_err) => {
tracing::warn!(
schedule = %sched.name,
error = %first_err,
"scheduled job failed, retrying once (possible vShard migration)"
);
let retry_executor =
StatementExecutor::new(state, identity, TenantId::new(sched.tenant_id), 0);
retry_executor
.execute_block_with_budget(&block, &bindings, &mut budget)
.await?;
}
}
Ok(start.elapsed().as_millis() as u64)
}
fn scheduler_identity(tenant_id: TenantId, owner: &str) -> AuthenticatedIdentity {
AuthenticatedIdentity {
user_id: 0,
username: owner.to_string(),
tenant_id,
auth_method: AuthMethod::Trust,
roles: vec![Role::Superuser],
is_superuser: true,
default_database: None,
accessible_databases: crate::control::security::identity::DatabaseSet::All,
}
}
#[cfg(test)]
mod tests {
use super::super::types::MissedPolicy;
use super::*;
fn make_schedule(name: &str, target: Option<&str>, scope: ScheduleScope) -> ScheduleDef {
ScheduleDef {
tenant_id: 1,
name: name.into(),
cron_expr: "* * * * *".into(),
body_sql: "SELECT 1".into(),
scope,
missed_policy: MissedPolicy::Skip,
allow_overlap: true,
enabled: true,
target_collection: target.map(|s| s.to_string()),
owner: "admin".into(),
created_at: 0,
}
}
#[test]
fn scheduler_identity_is_superuser() {
let id = scheduler_identity(TenantId::new(1), "admin");
assert!(id.is_superuser);
assert_eq!(id.username, "admin");
}
#[tokio::test]
async fn local_scope_always_fires() {
let sched = make_schedule("local_job", Some("orders"), ScheduleScope::Local);
let dir = tempfile::tempdir().unwrap();
let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
assert!(should_fire_on_this_node(&sched, &state));
}
#[tokio::test]
async fn single_node_always_fires() {
let sched = make_schedule("normal_job", Some("orders"), ScheduleScope::Normal);
let dir = tempfile::tempdir().unwrap();
let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
assert!(state.cluster_routing.is_none());
assert!(should_fire_on_this_node(&sched, &state));
}
#[tokio::test]
async fn local_scope_healthy_without_raft() {
let sched = make_schedule("local_job", None, ScheduleScope::Local);
let dir = tempfile::tempdir().unwrap();
let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
assert!(is_raft_group_healthy(&sched, &state));
}
#[tokio::test]
async fn single_node_healthy_without_raft() {
let sched = make_schedule("job", Some("orders"), ScheduleScope::Normal);
let dir = tempfile::tempdir().unwrap();
let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
assert!(is_raft_group_healthy(&sched, &state));
}
}