use crate::task::{parse_interval, Task, TaskExecution, TaskStatus, TaskTrigger};
use car_multi::{AgentRunner, AgentSpec, Mailbox, SharedInfra};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{info, warn};
pub struct Executor {
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
}
impl Executor {
pub fn new(runner: Arc<dyn AgentRunner>) -> Self {
Self {
runner,
infra: SharedInfra::new(),
}
}
pub fn with_shared_infra(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
Self { runner, infra }
}
pub async fn run_once(&self, task: &mut Task) -> TaskExecution {
let execution_id = uuid::Uuid::new_v4().to_string()[..10].to_string();
let started_at = Utc::now();
task.status = TaskStatus::Running;
let spec = AgentSpec {
name: task.name.clone(),
system_prompt: task.system_prompt.clone(),
tools: Vec::new(),
max_turns: task.max_turns,
metadata: task.agent_metadata.clone(),
cache_control: false,
};
let rt = self.infra.make_runtime();
let mailbox = Mailbox::default();
let start = std::time::Instant::now();
let execution = match self.runner.run(&spec, &task.prompt, &rt, &mailbox).await {
Ok(output) => {
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
info!(
task_id = %task.id,
task_name = %task.name,
duration_ms = duration_ms,
"task completed"
);
TaskExecution {
execution_id,
started_at,
finished_at: Some(Utc::now()),
status: TaskStatus::Completed,
answer: output.answer,
error: output.error,
duration_ms: Some(duration_ms),
}
}
Err(e) => {
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
warn!(
task_id = %task.id,
task_name = %task.name,
error = %e,
"task failed"
);
TaskExecution {
execution_id,
started_at,
finished_at: Some(Utc::now()),
status: TaskStatus::Failed,
answer: String::new(),
error: Some(e.to_string()),
duration_ms: Some(duration_ms),
}
}
};
task.last_run_at = Some(execution.started_at);
task.run_count += 1;
task.status = execution.status;
task.executions.push(execution.clone());
execution
}
pub async fn run_loop(
&self,
task: &mut Task,
max_iterations: Option<u32>,
cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
match task.trigger {
TaskTrigger::Once | TaskTrigger::Manual => {
vec![self.run_once(task).await]
}
TaskTrigger::Interval | TaskTrigger::Cron => {
self.run_interval(task, max_iterations, cancel).await
}
TaskTrigger::FileWatch => {
self.run_file_watch(task, max_iterations, cancel).await
}
}
}
async fn run_interval(
&self,
task: &mut Task,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
let interval_secs = parse_interval(&task.schedule);
let interval = tokio::time::Duration::from_secs_f64(interval_secs);
let mut executions = Vec::new();
let mut iterations: u32 = 0;
task.status = TaskStatus::Scheduled;
loop {
if !task.enabled {
break;
}
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
executions.push(self.run_once(task).await);
iterations += 1;
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
info!(task_id = %task.id, "task cancelled");
break;
}
}
}
}
task.status = if task.enabled {
TaskStatus::Scheduled
} else {
TaskStatus::Completed
};
executions
}
async fn run_file_watch(
&self,
task: &mut Task,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
let poll_interval = tokio::time::Duration::from_secs(2);
let mut executions = Vec::new();
let mut iterations: u32 = 0;
let mut last_hash = file_hash(&task.watch_path);
task.status = TaskStatus::Scheduled;
loop {
if !task.enabled {
break;
}
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
tokio::select! {
_ = tokio::time::sleep(poll_interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
info!(task_id = %task.id, "file watch cancelled");
break;
}
}
}
let current_hash = file_hash(&task.watch_path);
if current_hash != last_hash {
last_hash = current_hash;
executions.push(self.run_once(task).await);
iterations += 1;
}
}
task.status = TaskStatus::Completed;
executions
}
}
fn file_hash(path: &str) -> Option<u64> {
use std::hash::{Hash, Hasher};
let data = std::fs::read(path).ok()?;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
data.hash(&mut hasher);
Some(hasher.finish())
}
pub struct TaskHandle {
pub task_id: String,
pub cancel_tx: watch::Sender<bool>,
pub join: tokio::task::JoinHandle<Vec<TaskExecution>>,
}
impl TaskHandle {
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
}
pub fn spawn_task(
mut task: Task,
runner: Arc<dyn AgentRunner>,
max_iterations: Option<u32>,
) -> TaskHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let task_id = task.id.clone();
let join = tokio::spawn(async move {
let executor = Executor::new(runner);
executor.run_loop(&mut task, max_iterations, cancel_rx).await
});
TaskHandle {
task_id,
cancel_tx,
join,
}
}
pub fn spawn_task_shared(
mut task: Task,
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
max_iterations: Option<u32>,
) -> TaskHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let task_id = task.id.clone();
let join = tokio::spawn(async move {
let executor = Executor::with_shared_infra(runner, infra);
executor.run_loop(&mut task, max_iterations, cancel_rx).await
});
TaskHandle {
task_id,
cancel_tx,
join,
}
}
#[cfg(test)]
mod tests {
use super::*;
use car_multi::{AgentOutput, AgentRunner, Mailbox, MultiError};
use car_engine::Runtime;
struct MockRunner;
#[async_trait::async_trait]
impl AgentRunner for MockRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("completed: {}", &task[..task.len().min(50)]),
turns: 1,
tool_calls: 0,
duration_ms: 10.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn test_run_once() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("test", "do something");
let execution = executor.run_once(&mut task).await;
assert_eq!(execution.status, TaskStatus::Completed);
assert!(execution.answer.contains("completed"));
assert_eq!(task.run_count, 1);
assert!(task.last_run_at.is_some());
}
#[tokio::test]
async fn test_run_interval_with_max() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("interval_test", "repeat this")
.with_trigger(TaskTrigger::Interval, "0");
let (_cancel_tx, cancel_rx) = watch::channel(false);
let executions = executor.run_loop(&mut task, Some(3), cancel_rx).await;
assert_eq!(executions.len(), 3);
assert_eq!(task.run_count, 3);
}
#[tokio::test]
async fn test_spawn_and_cancel() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let task = Task::new("bg_task", "background work")
.with_trigger(TaskTrigger::Interval, "0");
let handle = spawn_task(task, runner, None);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
handle.cancel();
let executions = handle.join.await.unwrap();
assert!(!executions.is_empty());
}
#[tokio::test]
async fn test_file_watch() {
let dir = tempfile::TempDir::new().unwrap();
let watch_file = dir.path().join("watched.txt");
std::fs::write(&watch_file, "v1").unwrap();
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("watcher", "process file change")
.with_file_watch(watch_file.to_str().unwrap());
let (cancel_tx, cancel_rx) = watch::channel(false);
let watch_path = watch_file.clone();
let cancel_tx_clone = cancel_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
std::fs::write(&watch_path, "v2").unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await;
let _ = cancel_tx_clone.send(true);
});
let executions = executor.run_loop(&mut task, Some(1), cancel_rx).await;
assert_eq!(executions.len(), 1);
assert!(executions[0].answer.contains("completed"));
}
}