car-scheduler 0.7.0

Task scheduling and background execution for Common Agent Runtime
Documentation
//! Task executor — runs tasks using the AgentRunner callback.
//!
//! Handles trigger types: once, interval, file watch, cron.
//! Each execution creates a TaskExecution record.

use crate::task::{parse_interval, Task, TaskExecution, TaskStatus, TaskTrigger};
use car_multi::{AgentRunner, AgentSpec, Mailbox, SharedInfra};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{info, warn};

/// Runs tasks through the AgentRunner.
pub struct Executor {
    runner: Arc<dyn AgentRunner>,
    infra: SharedInfra,
}

impl Executor {
    pub fn new(runner: Arc<dyn AgentRunner>) -> Self {
        Self {
            runner,
            infra: SharedInfra::new(),
        }
    }

    pub fn with_shared_infra(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
        Self { runner, infra }
    }

    /// Execute a task once and return the execution record.
    pub async fn run_once(&self, task: &mut Task) -> TaskExecution {
        let execution_id = uuid::Uuid::new_v4().to_string()[..10].to_string();
        let started_at = Utc::now();

        task.status = TaskStatus::Running;

        let spec = AgentSpec {
            name: task.name.clone(),
            system_prompt: task.system_prompt.clone(),
            tools: Vec::new(),
            max_turns: task.max_turns,
            metadata: task.agent_metadata.clone(),
            cache_control: false,
        };

        let rt = self.infra.make_runtime();
        let mailbox = Mailbox::default();
        let start = std::time::Instant::now();

        let execution = match self.runner.run(&spec, &task.prompt, &rt, &mailbox).await {
            Ok(output) => {
                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
                info!(
                    task_id = %task.id,
                    task_name = %task.name,
                    duration_ms = duration_ms,
                    "task completed"
                );
                TaskExecution {
                    execution_id,
                    started_at,
                    finished_at: Some(Utc::now()),
                    status: TaskStatus::Completed,
                    answer: output.answer,
                    error: output.error,
                    duration_ms: Some(duration_ms),
                }
            }
            Err(e) => {
                let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
                warn!(
                    task_id = %task.id,
                    task_name = %task.name,
                    error = %e,
                    "task failed"
                );
                TaskExecution {
                    execution_id,
                    started_at,
                    finished_at: Some(Utc::now()),
                    status: TaskStatus::Failed,
                    answer: String::new(),
                    error: Some(e.to_string()),
                    duration_ms: Some(duration_ms),
                }
            }
        };

        task.last_run_at = Some(execution.started_at);
        task.run_count += 1;
        task.status = execution.status;
        task.executions.push(execution.clone());

        execution
    }

    /// Run a task according to its trigger. Respects the cancellation token.
    ///
    /// - Once/Manual: runs once.
    /// - Interval: runs every N seconds until cancelled or max_iterations.
    /// - FileWatch: polls file for changes, runs on change.
    /// - Cron: treated as interval (full cron parsing is out of scope).
    pub async fn run_loop(
        &self,
        task: &mut Task,
        max_iterations: Option<u32>,
        cancel: watch::Receiver<bool>,
    ) -> Vec<TaskExecution> {
        match task.trigger {
            TaskTrigger::Once | TaskTrigger::Manual => {
                vec![self.run_once(task).await]
            }
            TaskTrigger::Interval | TaskTrigger::Cron => {
                self.run_interval(task, max_iterations, cancel).await
            }
            TaskTrigger::FileWatch => {
                self.run_file_watch(task, max_iterations, cancel).await
            }
        }
    }

    async fn run_interval(
        &self,
        task: &mut Task,
        max_iterations: Option<u32>,
        mut cancel: watch::Receiver<bool>,
    ) -> Vec<TaskExecution> {
        let interval_secs = parse_interval(&task.schedule);
        let interval = tokio::time::Duration::from_secs_f64(interval_secs);
        let mut executions = Vec::new();
        let mut iterations: u32 = 0;

        task.status = TaskStatus::Scheduled;

        loop {
            if !task.enabled {
                break;
            }
            if let Some(max) = max_iterations {
                if iterations >= max {
                    break;
                }
            }

            executions.push(self.run_once(task).await);
            iterations += 1;

            if let Some(max) = max_iterations {
                if iterations >= max {
                    break;
                }
            }

            // Wait for interval or cancellation
            tokio::select! {
                _ = tokio::time::sleep(interval) => {}
                _ = cancel.changed() => {
                    if *cancel.borrow() {
                        info!(task_id = %task.id, "task cancelled");
                        break;
                    }
                }
            }
        }

        task.status = if task.enabled {
            TaskStatus::Scheduled
        } else {
            TaskStatus::Completed
        };

        executions
    }

    async fn run_file_watch(
        &self,
        task: &mut Task,
        max_iterations: Option<u32>,
        mut cancel: watch::Receiver<bool>,
    ) -> Vec<TaskExecution> {
        let poll_interval = tokio::time::Duration::from_secs(2);
        let mut executions = Vec::new();
        let mut iterations: u32 = 0;
        let mut last_hash = file_hash(&task.watch_path);

        task.status = TaskStatus::Scheduled;

        loop {
            if !task.enabled {
                break;
            }
            if let Some(max) = max_iterations {
                if iterations >= max {
                    break;
                }
            }

            // Wait for poll interval or cancellation
            tokio::select! {
                _ = tokio::time::sleep(poll_interval) => {}
                _ = cancel.changed() => {
                    if *cancel.borrow() {
                        info!(task_id = %task.id, "file watch cancelled");
                        break;
                    }
                }
            }

            let current_hash = file_hash(&task.watch_path);
            if current_hash != last_hash {
                last_hash = current_hash;
                executions.push(self.run_once(task).await);
                iterations += 1;
            }
        }

        task.status = TaskStatus::Completed;
        executions
    }
}

/// Simple file hash for change detection.
fn file_hash(path: &str) -> Option<u64> {
    use std::hash::{Hash, Hasher};
    let data = std::fs::read(path).ok()?;
    let mut hasher = std::collections::hash_map::DefaultHasher::new();
    data.hash(&mut hasher);
    Some(hasher.finish())
}

/// Handle for a background task. Send `true` to cancel.
pub struct TaskHandle {
    pub task_id: String,
    pub cancel_tx: watch::Sender<bool>,
    pub join: tokio::task::JoinHandle<Vec<TaskExecution>>,
}

impl TaskHandle {
    /// Cancel the running task.
    pub fn cancel(&self) {
        let _ = self.cancel_tx.send(true);
    }
}

/// Spawn a task to run in the background. Returns a handle for cancellation.
pub fn spawn_task(
    mut task: Task,
    runner: Arc<dyn AgentRunner>,
    max_iterations: Option<u32>,
) -> TaskHandle {
    let (cancel_tx, cancel_rx) = watch::channel(false);
    let task_id = task.id.clone();

    let join = tokio::spawn(async move {
        let executor = Executor::new(runner);
        executor.run_loop(&mut task, max_iterations, cancel_rx).await
    });

    TaskHandle {
        task_id,
        cancel_tx,
        join,
    }
}

/// Spawn a task with shared infrastructure. Returns a handle for cancellation.
pub fn spawn_task_shared(
    mut task: Task,
    runner: Arc<dyn AgentRunner>,
    infra: SharedInfra,
    max_iterations: Option<u32>,
) -> TaskHandle {
    let (cancel_tx, cancel_rx) = watch::channel(false);
    let task_id = task.id.clone();

    let join = tokio::spawn(async move {
        let executor = Executor::with_shared_infra(runner, infra);
        executor.run_loop(&mut task, max_iterations, cancel_rx).await
    });

    TaskHandle {
        task_id,
        cancel_tx,
        join,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use car_multi::{AgentOutput, AgentRunner, Mailbox, MultiError};
    use car_engine::Runtime;

    struct MockRunner;

    #[async_trait::async_trait]
    impl AgentRunner for MockRunner {
        async fn run(
            &self,
            spec: &AgentSpec,
            task: &str,
            _runtime: &Runtime,
            _mailbox: &Mailbox,
        ) -> Result<AgentOutput, MultiError> {
            Ok(AgentOutput {
                name: spec.name.clone(),
                answer: format!("completed: {}", &task[..task.len().min(50)]),
                turns: 1,
                tool_calls: 0,
                duration_ms: 10.0,
                error: None,
                outcome: None,
                tokens: None,
            })
        }
    }

    #[tokio::test]
    async fn test_run_once() {
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
        let executor = Executor::new(runner);
        let mut task = Task::new("test", "do something");

        let execution = executor.run_once(&mut task).await;

        assert_eq!(execution.status, TaskStatus::Completed);
        assert!(execution.answer.contains("completed"));
        assert_eq!(task.run_count, 1);
        assert!(task.last_run_at.is_some());
    }

    #[tokio::test]
    async fn test_run_interval_with_max() {
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
        let executor = Executor::new(runner);
        let mut task = Task::new("interval_test", "repeat this")
            .with_trigger(TaskTrigger::Interval, "0");  // 0 second interval for test speed

        let (_cancel_tx, cancel_rx) = watch::channel(false);
        let executions = executor.run_loop(&mut task, Some(3), cancel_rx).await;

        assert_eq!(executions.len(), 3);
        assert_eq!(task.run_count, 3);
    }

    #[tokio::test]
    async fn test_spawn_and_cancel() {
        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
        let task = Task::new("bg_task", "background work")
            .with_trigger(TaskTrigger::Interval, "0");

        let handle = spawn_task(task, runner, None);

        // Let it run a couple iterations
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

        // Cancel it
        handle.cancel();
        let executions = handle.join.await.unwrap();

        assert!(!executions.is_empty());
    }

    #[tokio::test]
    async fn test_file_watch() {
        let dir = tempfile::TempDir::new().unwrap();
        let watch_file = dir.path().join("watched.txt");
        std::fs::write(&watch_file, "v1").unwrap();

        let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
        let executor = Executor::new(runner);
        let mut task = Task::new("watcher", "process file change")
            .with_file_watch(watch_file.to_str().unwrap());

        let (cancel_tx, cancel_rx) = watch::channel(false);

        // Spawn file watch in background
        let watch_path = watch_file.clone();
        let cancel_tx_clone = cancel_tx.clone();
        tokio::spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
            std::fs::write(&watch_path, "v2").unwrap();
            tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await;
            let _ = cancel_tx_clone.send(true);
        });

        let executions = executor.run_loop(&mut task, Some(1), cancel_rx).await;

        assert_eq!(executions.len(), 1);
        assert!(executions[0].answer.contains("completed"));
    }
}