car-scheduler 0.33.0

Task scheduling and background execution for Common Agent Runtime
//! Deterministic command execution for scheduler command tasks (#72).
//!
//! A [`crate::task::CommandSpec`] fires by running `program` with `args` — no
//! LLM/agent runner involved — capturing exit status + output into a
//! [`TaskExecution`]. This is the in-daemon deterministic path: the daemon-native
//! scheduler timer calls this directly while `car-server` is running.

use crate::task::{CommandSpec, TaskExecution, TaskStatus};
use chrono::Utc;

/// Cap on captured stdout/stderr per run so a chatty command can't bloat the
/// task's execution history. Truncates on a char boundary.
const OUTPUT_CAP: usize = 16 * 1024;

/// Default hard timeout: a command that runs longer is killed. Bounds the poller
/// so one hung task can't wedge every other daemon-native task.
const DEFAULT_TIMEOUT_SECS: u64 = 300;

/// A minimal PATH so a cleared-environment command can still find standard
/// programs by name. The daemon's own environment (and any secrets it holds) is
/// NOT inherited — only this PATH plus the task's explicit `env`.
const SAFE_PATH: &str = "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin";

fn cap(s: &str) -> String {
    if s.len() <= OUTPUT_CAP {
        return s.to_string();
    }
    let mut end = OUTPUT_CAP;
    while !s.is_char_boundary(end) {
        end -= 1;
    }
    format!("{}…[truncated]", &s[..end])
}

/// Run a command task's command to completion, returning a [`TaskExecution`].
/// Program + args are hygiene-checked (no empty/control chars — the same guard
/// the OS projector uses against crontab injection) before spawning.
pub async fn run_command(spec: &CommandSpec) -> TaskExecution {
    let started_at = Utc::now();
    let execution_id = format!("cmd-{}", &uuid::Uuid::new_v4().to_string()[..8]);

    // Hygiene: reject an empty program or control chars in program/args.
    if let Err(e) = crate::os_schedule::validate_command_value("program", &spec.program) {
        return TaskExecution {
            execution_id,
            started_at,
            finished_at: Some(Utc::now()),
            status: TaskStatus::Failed,
            answer: String::new(),
            error: Some(e.to_string()),
            duration_ms: Some(0.0),
        };
    }
    for a in &spec.args {
        if let Err(e) = crate::os_schedule::validate_command_value("arg", a) {
            return TaskExecution {
                execution_id,
                started_at,
                finished_at: Some(Utc::now()),
                status: TaskStatus::Failed,
                answer: String::new(),
                error: Some(e.to_string()),
                duration_ms: Some(0.0),
            };
        }
    }

    let mut cmd = tokio::process::Command::new(&spec.program);
    cmd.args(&spec.args);
    if let Some(wd) = &spec.working_dir {
        cmd.current_dir(wd);
    }
    // Clear the daemon's environment so its secrets/tokens never leak into a
    // scheduled command; provide only a standard PATH + the task's explicit env.
    cmd.env_clear();
    cmd.env("PATH", SAFE_PATH);
    for (k, v) in &spec.env {
        cmd.env(k, v);
    }
    // Ensure a killed/timed-out command's process is reaped, not orphaned.
    cmd.kill_on_drop(true);

    let timeout = std::time::Duration::from_secs(
        spec.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS).max(1),
    );
    let output = match tokio::time::timeout(timeout, cmd.output()).await {
        Ok(o) => o,
        Err(_) => {
            // Timed out — the child is killed on drop of the future's Command.
            return TaskExecution {
                execution_id,
                started_at,
                finished_at: Some(Utc::now()),
                status: TaskStatus::Failed,
                answer: String::new(),
                error: Some(format!("timed out after {}s", timeout.as_secs())),
                duration_ms: Some((Utc::now() - started_at).num_milliseconds() as f64),
            };
        }
    };
    let finished_at = Utc::now();
    let duration_ms = (finished_at - started_at).num_milliseconds() as f64;

    match output {
        Ok(o) => {
            let ok = o.status.success();
            let stdout = String::from_utf8_lossy(&o.stdout);
            let stderr = String::from_utf8_lossy(&o.stderr);
            let error = if ok {
                None
            } else {
                let code = o
                    .status
                    .code()
                    .map(|c| c.to_string())
                    .unwrap_or_else(|| "signal".to_string());
                Some(format!("exited {code}: {}", cap(stderr.trim())))
            };
            TaskExecution {
                execution_id,
                started_at,
                finished_at: Some(finished_at),
                status: if ok {
                    TaskStatus::Completed
                } else {
                    TaskStatus::Failed
                },
                answer: cap(&stdout),
                error,
                duration_ms: Some(duration_ms),
            }
        }
        Err(e) => TaskExecution {
            execution_id,
            started_at,
            finished_at: Some(finished_at),
            status: TaskStatus::Failed,
            answer: String::new(),
            error: Some(format!("could not run '{}': {e}", spec.program)),
            duration_ms: Some(duration_ms),
        },
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::collections::BTreeMap;

    fn spec(program: &str, args: &[&str]) -> CommandSpec {
        CommandSpec {
            program: program.to_string(),
            args: args.iter().map(|s| s.to_string()).collect(),
            working_dir: None,
            env: BTreeMap::new(),
            timeout_secs: None,
        }
    }

    #[tokio::test]
    async fn runs_a_command_and_captures_stdout() {
        let e = run_command(&spec("/bin/echo", &["hello"])).await;
        assert_eq!(e.status, TaskStatus::Completed);
        assert_eq!(e.answer.trim(), "hello");
        assert!(e.error.is_none());
    }

    #[tokio::test]
    async fn nonzero_exit_is_failed_with_error() {
        let e = run_command(&spec("/bin/sh", &["-c", "exit 3"])).await;
        assert_eq!(e.status, TaskStatus::Failed);
        assert!(e.error.as_deref().unwrap().contains("exited 3"));
    }

    #[tokio::test]
    async fn missing_program_is_failed_not_panic() {
        let e = run_command(&spec("/nonexistent/car-xyz", &[])).await;
        assert_eq!(e.status, TaskStatus::Failed);
        assert!(e.error.as_deref().unwrap().contains("could not run"));
    }

    #[tokio::test]
    async fn control_chars_rejected() {
        let e = run_command(&spec("/bin/echo", &["a\nb"])).await;
        assert_eq!(e.status, TaskStatus::Failed);
    }

    #[tokio::test]
    async fn hung_command_times_out_not_wedges() {
        let mut s = spec("/bin/sh", &["-c", "sleep 30"]);
        s.timeout_secs = Some(1);
        let e = run_command(&s).await;
        assert_eq!(e.status, TaskStatus::Failed);
        assert!(e.error.as_deref().unwrap().contains("timed out"));
    }

    #[tokio::test]
    async fn env_is_cleared_daemon_secrets_dont_leak() {
        std::env::set_var("CAR_SECRET_SENTINEL", "leaked");
        let e = run_command(&spec("/bin/sh", &["-c", "echo [${CAR_SECRET_SENTINEL:-clean}]"])).await;
        assert_eq!(e.status, TaskStatus::Completed);
        assert_eq!(e.answer.trim(), "[clean]", "daemon env must not leak");
    }
}