car-server-core 0.33.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
//! Daemon-native command scheduler (#72).
//!
//! A boot-spawned poller that fires **deterministic command tasks** on their
//! interval cadence while `car-server` runs — the in-daemon alternative to the
//! OS scheduler for consumers who'd rather car-server own the timer when it's up
//! anyway. Only fires:
//! - **command** tasks (a [`car_scheduler::CommandSpec`], not an LLM prompt),
//! - with an **interval** trigger (cron cadences go to the OS scheduler; there's
//!   no in-daemon cron evaluator),
//! - that are **not** OS-installed (so a durable task isn't double-fired).
//!
//! Mirrors the evolution cadence: a single non-overlapping task, dies with the
//! runtime. Runtime-added tasks are picked up on the next poll (the store is the
//! source of truth), so no per-task re-arming is needed.

use std::collections::HashSet;
use std::time::Duration;

use car_scheduler::os_schedule::LABEL_PREFIX;
use car_scheduler::{parse_interval, run_command, TaskStore, TaskTrigger};

/// How often the poller wakes to check for due command tasks. 30s bounds the
/// worst-case firing lateness; the interval cadence itself can be any value.
const POLL_SECS: u64 = 30;

/// Retain at most this many execution records per task so a frequently-firing
/// command can't grow its task file unboundedly.
const MAX_EXECUTIONS: usize = 50;

/// Spawn the daemon-native command scheduler. Idempotent-safe to call once at
/// boot; the task lives for the process lifetime.
pub fn spawn_command_scheduler() {
    tokio::spawn(async move {
        let mut ticker = tokio::time::interval(Duration::from_secs(POLL_SECS));
        ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
        loop {
            ticker.tick().await;
            if let Err(e) = poll_once().await {
                tracing::warn!(target: "car::scheduler", "[command-scheduler] {e}");
            }
        }
    });
}

/// One poll pass: fire every due, enabled, daemon-native interval command task.
async fn poll_once() -> Result<(), String> {
    let store = TaskStore::new(&TaskStore::default_path());
    // A transient store-read failure must not look like "no tasks"; surface it.
    let tasks = store
        .try_list()
        .map_err(|e| format!("could not read task store: {e}"))?;
    // If we can't read which tasks the OS scheduler owns, skip this tick rather
    // than risk double-firing an OS-installed task (fail-safe, not fail-open).
    let os_installed: HashSet<String> = car_scheduler::list_installed()
        .map_err(|e| format!("could not read OS schedules (skipping tick): {e}"))?
        .into_iter()
        .collect();
    let now = chrono::Utc::now();

    for mut task in tasks {
        if !task.enabled || !task.is_command() || task.trigger != TaskTrigger::Interval {
            continue;
        }
        // Durable (OS-installed) tasks are fired by launchd/cron — don't double-fire.
        let label = format!("{LABEL_PREFIX}{}", task.id);
        if os_installed.contains(&label) {
            continue;
        }
        let interval_secs = parse_interval(&task.schedule);
        if interval_secs <= 0.0 {
            continue;
        }
        let due = match task.last_run_at {
            None => true, // never run → fire now
            Some(last) => (now - last).num_seconds() as f64 >= interval_secs,
        };
        if !due {
            continue;
        }
        let Some(cmd) = task.command.clone() else {
            continue;
        };
        let exec = run_command(&cmd).await;
        tracing::info!(
            target: "car::scheduler",
            task = %task.id, status = ?exec.status,
            "[command-scheduler] fired command task"
        );
        task.last_run_at = Some(now);
        task.run_count = task.run_count.saturating_add(1);
        task.status = exec.status;
        task.executions.push(exec);
        if task.executions.len() > MAX_EXECUTIONS {
            let drop = task.executions.len() - MAX_EXECUTIONS;
            task.executions.drain(0..drop);
        }
        if let Err(e) = store.save(&task) {
            tracing::warn!(target: "car::scheduler", "[command-scheduler] save {}: {e}", task.id);
        }
    }
    Ok(())
}