nodedb 0.3.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
// SPDX-License-Identifier: BUSL-1.1

//! Scheduler executor: Tokio task that evaluates cron expressions every second.
//!
//! For each due schedule, dispatches the SQL body through the Control Plane
//! query path using a system identity (SECURITY DEFINER).
//!
//! **Leader-aware (cluster mode):** Before firing, the scheduler checks
//! if this node is the Raft leader for the schedule's target vShard.
//! Only the leader fires — follower nodes skip. When a vShard migrates
//! or leadership changes, the new leader automatically picks up the schedule.
//!
//! **Lease-aware:** If this node is leader but the Raft group is lagging
//! (commit_index > last_applied), the scheduler skips firing to prevent
//! stale execution during a network partition.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use tokio::sync::watch;
use tracing::{debug, info, trace, warn};

use crate::control::planner::procedural::executor::bindings::RowBindings;
use crate::control::planner::procedural::executor::core::StatementExecutor;
use crate::control::security::identity::{AuthMethod, AuthenticatedIdentity, Role};
use crate::control::state::SharedState;
use crate::types::TenantId;

use super::cron::CronExpr;
use super::dispatcher::{DispatchOutcome, JobDispatcher, JobDispatcherConfig};
use super::history::JobHistoryStore;
use super::registry::ScheduleRegistry;
use super::types::{JobRun, ScheduleDef, ScheduleScope};

/// Compute the set of cron-matching minute numbers that still need to
/// fire for one schedule given its last-fired minute and the current
/// observation.
///
/// The caller is the per-second tick loop. The loop cannot assume its
/// observed `now_secs` lands on a minute boundary — Tokio scheduling
/// latency, GC pauses, and leader handoffs can all push the observation
/// a second or more past `xx:00`. Gating on `now_secs % 60 == 0` drops
/// the entire minute on any such jitter; in the worst case a daily
/// schedule misses its single matching minute and doesn't fire for 24
/// hours.
///
/// Contract:
/// - `last_fired_minute = None` (never fired, or loop restart): consider
///   only the current minute. This bounds catch-up — a freshly started
///   node does not replay ticks from epoch.
/// - `last_fired_minute = Some(L)`: return every minute in
///   `(L ..= now_secs / 60]` that the cron expression matches.
/// - Within-minute re-observations (e.g. second 61 when minute 1 has
///   already fired) return empty.
pub fn pending_minute_ticks(
    last_fired_minute: Option<u64>,
    now_secs: u64,
    cron: &CronExpr,
    utc_offset_seconds: i32,
) -> Vec<u64> {
    let now_min = now_secs / 60;
    let start = match last_fired_minute {
        Some(last) => last.saturating_add(1),
        None => now_min,
    };
    (start..=now_min)
        .filter(|m| cron.matches_epoch_with_offset(m.saturating_mul(60), utc_offset_seconds))
        .collect()
}

/// Spawn the scheduler loop as a background Tokio task.
pub fn spawn_scheduler(
    state: Arc<SharedState>,
    registry: Arc<ScheduleRegistry>,
    history: Arc<JobHistoryStore>,
    shutdown: watch::Receiver<bool>,
) -> tokio::task::JoinHandle<()> {
    tokio::spawn(async move {
        scheduler_loop(state, registry, history, shutdown).await;
    })
}

/// The main scheduler loop. Runs every second.
async fn scheduler_loop(
    state: Arc<SharedState>,
    registry: Arc<ScheduleRegistry>,
    history: Arc<JobHistoryStore>,
    mut shutdown: watch::Receiver<bool>,
) {
    info!("scheduler started");

    // Track currently running jobs (for ALLOW_OVERLAP = false enforcement).
    // Shared with spawned job tasks so they remove themselves on completion.
    let running: Arc<std::sync::Mutex<HashSet<(u64, String)>>> =
        Arc::new(std::sync::Mutex::new(HashSet::new()));

    // Bounded job dispatcher — caps concurrency (from SchedulerTuning)
    // and owns the shutdown bus every job observes. The result-byte
    // ceiling is advisory for callers that have a planner estimate;
    // the scheduler enforces memory by routing each job through the
    // statement executor's `ExecutionBudget` wall-clock timeout below.
    let sched_tuning = &state.tuning.scheduler;
    let dispatcher = Arc::new(JobDispatcher::new(JobDispatcherConfig {
        max_concurrent_jobs: sched_tuning.max_concurrent_jobs,
        max_result_bytes: u64::MAX,
    }));
    let job_timeout_secs = sched_tuning.job_timeout_secs;

    // Per-schedule last-fired minute. Tracked in memory only: on loop
    // restart every schedule's last_fired is None, and
    // `pending_minute_ticks` returns at most the current minute. That
    // bounded catch-up is what prevents a cold start from replaying
    // ticks from epoch.
    let mut last_fired_minute: HashMap<(u64, String), u64> = HashMap::new();

    loop {
        tokio::select! {
            _ = tokio::time::sleep(Duration::from_secs(1)) => {}
            _ = shutdown.changed() => {
                if *shutdown.borrow() {
                    debug!("scheduler shutting down");
                    dispatcher.shutdown_and_drain().await;
                    return;
                }
            }
        }

        if *shutdown.borrow() {
            dispatcher.shutdown_and_drain().await;
            return;
        }

        let now_secs = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();

        // Get all enabled schedules.
        let schedules = registry.list_all_enabled();
        if schedules.is_empty() {
            continue;
        }

        for sched in &schedules {
            // Parse cron expression.
            let cron = match CronExpr::parse(&sched.cron_expr) {
                Ok(c) => c,
                Err(e) => {
                    warn!(
                        schedule = %sched.name,
                        error = %e,
                        "invalid cron expression, skipping"
                    );
                    continue;
                }
            };

            // Per-schedule minute tracking: fire every matching minute
            // between the last-fired and the current observation. This
            // tolerates tick jitter past a minute boundary; the old code
            // gated on `now_secs % 60 == 0` and silently dropped any
            // minute whose tick landed at second != 0.
            let sched_key = (sched.tenant_id, sched.name.clone());
            let last = last_fired_minute.get(&sched_key).copied();
            let tz_offset = state.scheduler_config.cron_timezone.offset_seconds();
            let pending = pending_minute_ticks(last, now_secs, &cron, tz_offset);
            if pending.is_empty() {
                // Still advance the marker on first observation so that
                // a schedule whose cron never matches the current minute
                // doesn't fall into unbounded catch-up on the next tick.
                if last.is_none() {
                    last_fired_minute.insert(sched_key.clone(), now_secs / 60);
                }
                continue;
            }
            // Record the highest minute we're about to fire so subsequent
            // ticks within the same minute don't refire it.
            if let Some(&max_min) = pending.iter().max() {
                last_fired_minute.insert(sched_key.clone(), max_min);
            }

            // Leader-aware: skip if this node is not the right one for this schedule.
            if !should_fire_on_this_node(sched, &state) {
                trace!(
                    schedule = %sched.name,
                    "skipping: this node is not the leader for target vShard"
                );
                continue;
            }

            // Lease-aware: skip if the Raft group is lagging (stale leader).
            if !is_raft_group_healthy(sched, &state) {
                debug!(
                    schedule = %sched.name,
                    "skipping: Raft group lagging (lease-aware suspension)"
                );
                continue;
            }

            // Check overlap policy.
            let key = (sched.tenant_id, sched.name.clone());
            if !sched.allow_overlap {
                let guard = running.lock().unwrap_or_else(|p| p.into_inner());
                if guard.contains(&key) {
                    trace!(
                        schedule = %sched.name,
                        "skipping: previous run still active (ALLOW_OVERLAP = false)"
                    );
                    continue;
                }
            }

            // Mark as running before spawning (prevents race with next scheduler tick).
            {
                let mut guard = running.lock().unwrap_or_else(|p| p.into_inner());
                guard.insert(key.clone());
            }

            debug!(schedule = %sched.name, "firing scheduled job");

            let state_clone = Arc::clone(&state);
            let history_clone = Arc::clone(&history);
            let running_clone = Arc::clone(&running);
            let sched_clone = sched.clone();

            // Dispatch through the bounded dispatcher. Rejection at the
            // concurrency cap surfaces as a recorded failure — the job
            // simply didn't run this tick, rather than starving the
            // Tokio runtime with unbounded parallel SQL.
            let outcome = dispatcher.try_spawn(move |mut shutdown_rx| async move {
                let result = tokio::select! {
                    r = execute_job(&state_clone, &sched_clone, job_timeout_secs) => r,
                    _ = shutdown_rx.changed() => {
                        return Err(crate::Error::Dispatch { detail: "scheduler shutdown".to_string() });
                    }
                };
                let now_ms = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_millis() as u64;

                let run = match result {
                    Ok(duration_ms) => JobRun {
                        schedule_name: sched_clone.name.clone(),
                        tenant_id: sched_clone.tenant_id,
                        started_at: now_ms.saturating_sub(duration_ms),
                        duration_ms,
                        success: true,
                        error: None,
                    },
                    Err(e) => {
                        warn!(
                            schedule = %sched_clone.name,
                            error = %e,
                            "scheduled job failed"
                        );
                        JobRun {
                            schedule_name: sched_clone.name.clone(),
                            tenant_id: sched_clone.tenant_id,
                            started_at: now_ms,
                            duration_ms: 0,
                            success: false,
                            error: Some(e.to_string()),
                        }
                    }
                };

                if let Err(e) = history_clone.record(run) {
                    warn!(error = %e, "failed to record job history");
                }

                let key = (sched_clone.tenant_id, sched_clone.name.clone());
                let mut guard = running_clone.lock().unwrap_or_else(|p| p.into_inner());
                guard.remove(&key);
                Ok(())
            });

            if outcome == DispatchOutcome::OverBudget {
                // Release the running-set slot we reserved — the job
                // didn't actually start.
                let mut guard = running.lock().unwrap_or_else(|p| p.into_inner());
                guard.remove(&key);
                warn!(
                    schedule = %sched.name,
                    max_concurrent = sched_tuning.max_concurrent_jobs,
                    "scheduled job rejected: concurrency cap reached"
                );
            }
        }
    }
}

/// Check whether this node should fire the given schedule.
///
/// - `ScheduleScope::Local` → always fire (local node only).
/// - No `cluster_routing` → single-node mode → always fire.
/// - `target_collection` is Some → resolve vShard → check leader.
/// - `target_collection` is None → cross-collection job → only the lowest
///   node_id in the cluster fires (acts as `_system` coordinator).
fn should_fire_on_this_node(sched: &ScheduleDef, state: &SharedState) -> bool {
    // LOCAL scope: always fire on this node.
    if sched.scope == ScheduleScope::Local {
        return true;
    }

    // Single-node mode: no cluster routing → fire everything.
    let Some(ref routing_lock) = state.cluster_routing else {
        return true;
    };

    let node_id = state.node_id;

    if let Some(ref collection) = sched.target_collection {
        // Collection-targeted schedule: fire only on the shard leader.
        // Schedules are scoped to `DatabaseId::DEFAULT` today; when
        // ScheduleDef gains a database_id field, plumb it through here.
        let vshard_id = nodedb_cluster::routing::vshard_for_collection(
            nodedb_types::id::DatabaseId::DEFAULT,
            collection,
        );
        let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
        match routing.leader_for_vshard(vshard_id) {
            Ok(leader) => leader == node_id,
            Err(_) => {
                // Can't determine leader — skip to be safe.
                false
            }
        }
    } else {
        // Cross-collection or opaque job: fire on coordinator node.
        // Convention: the leader of vShard 0 acts as the _system coordinator.
        let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
        match routing.leader_for_vshard(0) {
            Ok(coordinator) => coordinator == node_id,
            Err(_) => false,
        }
    }
}

/// Check if the Raft group for this schedule's target vShard is healthy.
///
/// A group is healthy when `commit_index == last_applied` (fully caught up).
/// If the group is lagging, this node may be a stale leader during a partition.
/// Skipping prevents dual execution when the new leader hasn't taken over yet.
///
/// Returns `true` (healthy) in single-node mode or when no status function exists.
fn is_raft_group_healthy(sched: &ScheduleDef, state: &SharedState) -> bool {
    // LOCAL scope: no Raft group to check.
    if sched.scope == ScheduleScope::Local {
        return true;
    }

    // Single-node mode: always healthy.
    let Some(ref status_fn) = state.raft_status_fn else {
        return true;
    };
    let Some(ref routing_lock) = state.cluster_routing else {
        return true;
    };

    // Determine the target vShard's Raft group.
    let vshard_id = sched
        .target_collection
        .as_ref()
        .map(|c| {
            // Schedules are scoped to `DatabaseId::DEFAULT` today; when
            // ScheduleDef gains a database_id field, plumb it through here.
            nodedb_cluster::routing::vshard_for_collection(nodedb_types::id::DatabaseId::DEFAULT, c)
        })
        .unwrap_or(0); // Cross-collection → coordinator vShard 0.

    let group_id = {
        let routing = routing_lock.read().unwrap_or_else(|p| p.into_inner());
        match routing.group_for_vshard(vshard_id) {
            Ok(gid) => gid,
            Err(_) => return true, // Can't determine group — fire anyway.
        }
    };

    // Check the group's status.
    let statuses = status_fn();
    let Some(status) = statuses.iter().find(|s| s.group_id == group_id) else {
        return true; // Group not found locally — might be on another node, fire anyway.
    };

    // Stale leader check: if commit_index is ahead of last_applied,
    // this node has uncommitted entries — it may be partitioned.
    if status.commit_index > status.last_applied {
        let lag = status.commit_index - status.last_applied;
        debug!(
            group_id,
            commit_index = status.commit_index,
            last_applied = status.last_applied,
            lag,
            "Raft group lagging — suspending schedule fire"
        );
        return false;
    }

    // Also check that we're actually the leader for this group.
    if status.leader_id != state.node_id {
        return false;
    }

    true
}

/// Execute a single scheduled job under a wall-clock `ExecutionBudget`.
///
/// `job_timeout_secs` bounds total statement-executor time for the job,
/// so a runaway body cannot hold `Arc<SharedState>` past the shutdown
/// deadline. Returns the duration in milliseconds on success.
async fn execute_job(
    state: &SharedState,
    sched: &ScheduleDef,
    job_timeout_secs: u64,
) -> crate::Result<u64> {
    use crate::control::planner::procedural::executor::fuel::ExecutionBudget;

    let start = std::time::Instant::now();
    let identity = scheduler_identity(TenantId::new(sched.tenant_id), &sched.owner);

    // Pre-execution guard: reject unbounded `SELECT *` bodies before we
    // dispatch a single task, so the runtime byte ceiling isn't the only
    // thing standing between a careless schedule and hours of wasted
    // scanning.
    if let Err(msg) = super::body_guard::validate_scheduled_body(&sched.body_sql) {
        return Err(crate::Error::BadRequest {
            detail: format!("schedule '{}': {msg}", sched.name),
        });
    }

    let block = crate::control::planner::procedural::parse_block(&sched.body_sql).map_err(|e| {
        crate::Error::BadRequest {
            detail: format!("schedule '{}' body parse error: {e}", sched.name),
        }
    })?;

    let executor =
        StatementExecutor::new(state, identity.clone(), TenantId::new(sched.tenant_id), 0);
    let bindings = RowBindings::empty();
    // One budget for the whole job — retries consume the same wall-clock
    // and fuel pool as the first attempt so a runaway job can't double
    // its timeout by failing once.
    let mut budget = ExecutionBudget::new(100_000, job_timeout_secs);

    match executor
        .execute_block_with_budget(&block, &bindings, &mut budget)
        .await
    {
        Ok(()) => {}
        Err(first_err) => {
            tracing::warn!(
                schedule = %sched.name,
                error = %first_err,
                "scheduled job failed, retrying once (possible vShard migration)"
            );
            let retry_executor =
                StatementExecutor::new(state, identity, TenantId::new(sched.tenant_id), 0);
            retry_executor
                .execute_block_with_budget(&block, &bindings, &mut budget)
                .await?;
        }
    }

    Ok(start.elapsed().as_millis() as u64)
}

/// Build the owner's identity for scheduled job execution (SECURITY DEFINER).
///
/// SECURITY DEFINER semantics: scheduled jobs always run as superuser
/// under the creator's username. The username drives audit attribution;
/// privilege is fixed at superuser regardless of the creator's current
/// role membership.
fn scheduler_identity(tenant_id: TenantId, owner: &str) -> AuthenticatedIdentity {
    AuthenticatedIdentity {
        user_id: 0,
        username: owner.to_string(),
        tenant_id,
        auth_method: AuthMethod::Trust,
        roles: vec![Role::Superuser],
        is_superuser: true,
        default_database: None,
        accessible_databases: crate::control::security::identity::DatabaseSet::All,
    }
}

#[cfg(test)]
mod tests {
    use super::super::types::MissedPolicy;
    use super::*;

    fn make_schedule(name: &str, target: Option<&str>, scope: ScheduleScope) -> ScheduleDef {
        ScheduleDef {
            tenant_id: 1,
            name: name.into(),
            cron_expr: "* * * * *".into(),
            body_sql: "SELECT 1".into(),
            scope,
            missed_policy: MissedPolicy::Skip,
            allow_overlap: true,
            enabled: true,
            target_collection: target.map(|s| s.to_string()),
            owner: "admin".into(),
            created_at: 0,
        }
    }

    #[test]
    fn scheduler_identity_is_superuser() {
        let id = scheduler_identity(TenantId::new(1), "admin");
        assert!(id.is_superuser);
        assert_eq!(id.username, "admin");
    }

    #[tokio::test]
    async fn local_scope_always_fires() {
        let sched = make_schedule("local_job", Some("orders"), ScheduleScope::Local);
        // Even if we had cluster routing, LOCAL always fires.
        let dir = tempfile::tempdir().unwrap();
        let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
        assert!(should_fire_on_this_node(&sched, &state));
    }

    #[tokio::test]
    async fn single_node_always_fires() {
        let sched = make_schedule("normal_job", Some("orders"), ScheduleScope::Normal);
        let dir = tempfile::tempdir().unwrap();
        let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
        // No cluster_routing → single-node → always fires.
        assert!(state.cluster_routing.is_none());
        assert!(should_fire_on_this_node(&sched, &state));
    }

    #[tokio::test]
    async fn local_scope_healthy_without_raft() {
        let sched = make_schedule("local_job", None, ScheduleScope::Local);
        let dir = tempfile::tempdir().unwrap();
        let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
        assert!(is_raft_group_healthy(&sched, &state));
    }

    #[tokio::test]
    async fn single_node_healthy_without_raft() {
        let sched = make_schedule("job", Some("orders"), ScheduleScope::Normal);
        let dir = tempfile::tempdir().unwrap();
        let (_, _, state, _, _) = crate::event::test_utils::event_test_deps(&dir);
        // No raft_status_fn → always healthy.
        assert!(is_raft_group_healthy(&sched, &state));
    }
}