use crate::task::{parse_interval, Task, TaskExecution, TaskStatus, TaskTrigger};
use car_multi::{AgentRunner, AgentSpec, Mailbox, SharedInfra};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{info, warn};
pub struct Executor {
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
}
impl Executor {
pub fn new(runner: Arc<dyn AgentRunner>) -> Self {
Self {
runner,
infra: SharedInfra::new(),
}
}
pub fn with_shared_infra(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
Self { runner, infra }
}
pub async fn run_once(&self, task: &mut Task) -> TaskExecution {
self.run_occurrence(task, None).await
}
async fn run_occurrence(&self, task: &mut Task, file_hash: Option<u64>) -> TaskExecution {
let started_at = Utc::now();
let occurrence = occurrence_id(task, file_hash, started_at);
if let Some(ref id) = occurrence {
if let Some(existing) = task.executions.iter().find(|e| {
&e.execution_id == id
&& matches!(e.status, TaskStatus::Completed | TaskStatus::Running)
}) {
info!(
task_id = %task.id,
occurrence = %id,
"occurrence already serviced; skipping"
);
return existing.clone();
}
}
let execution_id =
occurrence.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()[..10].to_string());
task.status = TaskStatus::Running;
let spec = AgentSpec {
name: task.name.clone(),
system_prompt: task.system_prompt.clone(),
tools: Vec::new(),
max_turns: task.max_turns,
metadata: task.agent_metadata.clone(),
cache_control: false,
};
let rt = self.infra.make_runtime();
let mailbox = Mailbox::default();
let start = std::time::Instant::now();
let execution = match self.runner.run(&spec, &task.prompt, &rt, &mailbox).await {
Ok(output) => {
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
info!(
task_id = %task.id,
task_name = %task.name,
duration_ms = duration_ms,
"task completed"
);
TaskExecution {
execution_id,
started_at,
finished_at: Some(Utc::now()),
status: TaskStatus::Completed,
answer: output.answer,
error: output.error,
duration_ms: Some(duration_ms),
}
}
Err(e) => {
let duration_ms = start.elapsed().as_secs_f64() * 1000.0;
warn!(
task_id = %task.id,
task_name = %task.name,
error = %e,
"task failed"
);
TaskExecution {
execution_id,
started_at,
finished_at: Some(Utc::now()),
status: TaskStatus::Failed,
answer: String::new(),
error: Some(e.to_string()),
duration_ms: Some(duration_ms),
}
}
};
task.last_run_at = Some(execution.started_at);
task.run_count += 1;
task.status = execution.status;
task.executions.push(execution.clone());
execution
}
pub async fn run_loop(
&self,
task: &mut Task,
max_iterations: Option<u32>,
cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
match task.trigger {
TaskTrigger::Once | TaskTrigger::Manual => {
vec![self.run_once(task).await]
}
TaskTrigger::Interval | TaskTrigger::Cron => {
self.run_interval(task, max_iterations, cancel).await
}
TaskTrigger::FileWatch => self.run_file_watch(task, max_iterations, cancel).await,
}
}
async fn run_interval(
&self,
task: &mut Task,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
let interval_secs = parse_interval(&task.schedule);
let interval = tokio::time::Duration::from_secs_f64(interval_secs);
let mut executions: Vec<TaskExecution> = Vec::new();
let mut iterations: u32 = 0;
task.status = TaskStatus::Scheduled;
loop {
if !task.enabled {
break;
}
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
let exec = self.run_once(task).await;
if !executions.iter().any(|e| e.execution_id == exec.execution_id) {
executions.push(exec);
}
iterations += 1;
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
info!(task_id = %task.id, "task cancelled");
break;
}
}
}
}
task.status = if task.enabled {
TaskStatus::Scheduled
} else {
TaskStatus::Completed
};
executions
}
async fn run_file_watch(
&self,
task: &mut Task,
max_iterations: Option<u32>,
mut cancel: watch::Receiver<bool>,
) -> Vec<TaskExecution> {
let poll_interval = tokio::time::Duration::from_secs(2);
let mut executions: Vec<TaskExecution> = Vec::new();
let mut iterations: u32 = 0;
let mut last_hash = file_hash(&task.watch_path);
task.status = TaskStatus::Scheduled;
loop {
if !task.enabled {
break;
}
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
tokio::select! {
_ = tokio::time::sleep(poll_interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
info!(task_id = %task.id, "file watch cancelled");
break;
}
}
}
let current_hash = file_hash(&task.watch_path);
if current_hash != last_hash {
last_hash = current_hash;
let exec = self.run_occurrence(task, current_hash).await;
if !executions.iter().any(|e| e.execution_id == exec.execution_id) {
executions.push(exec);
}
iterations += 1;
}
}
task.status = TaskStatus::Completed;
executions
}
}
fn trigger_tag(trigger: TaskTrigger) -> &'static str {
match trigger {
TaskTrigger::Once => "once",
TaskTrigger::Cron => "cron",
TaskTrigger::Interval => "int",
TaskTrigger::FileWatch => "file",
TaskTrigger::Manual => "manual",
}
}
fn occurrence_id(task: &Task, file_hash: Option<u64>, now: DateTime<Utc>) -> Option<String> {
let slot = match task.trigger {
TaskTrigger::Interval | TaskTrigger::Cron => {
let interval = parse_interval(&task.schedule).max(1.0);
let elapsed = (now - task.created_at).num_milliseconds().max(0) as f64 / 1000.0;
((elapsed / interval).floor() as i64).to_string()
}
TaskTrigger::FileWatch => format!("{:016x}", file_hash.unwrap_or(0)),
TaskTrigger::Once => "0".to_string(),
TaskTrigger::Manual => return None,
};
Some(format!("{}:{}:{}", task.id, trigger_tag(task.trigger), slot))
}
fn file_hash(path: &str) -> Option<u64> {
use std::hash::{Hash, Hasher};
let data = std::fs::read(path).ok()?;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
data.hash(&mut hasher);
Some(hasher.finish())
}
pub struct TaskHandle {
pub task_id: String,
pub cancel_tx: watch::Sender<bool>,
pub join: tokio::task::JoinHandle<Vec<TaskExecution>>,
}
impl TaskHandle {
pub fn cancel(&self) {
let _ = self.cancel_tx.send(true);
}
}
pub fn spawn_task(
mut task: Task,
runner: Arc<dyn AgentRunner>,
max_iterations: Option<u32>,
) -> TaskHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let task_id = task.id.clone();
let join = tokio::spawn(async move {
let executor = Executor::new(runner);
executor
.run_loop(&mut task, max_iterations, cancel_rx)
.await
});
TaskHandle {
task_id,
cancel_tx,
join,
}
}
pub fn spawn_task_shared(
mut task: Task,
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
max_iterations: Option<u32>,
) -> TaskHandle {
let (cancel_tx, cancel_rx) = watch::channel(false);
let task_id = task.id.clone();
let join = tokio::spawn(async move {
let executor = Executor::with_shared_infra(runner, infra);
executor
.run_loop(&mut task, max_iterations, cancel_rx)
.await
});
TaskHandle {
task_id,
cancel_tx,
join,
}
}
#[cfg(test)]
mod tests {
use super::*;
use car_engine::Runtime;
use car_multi::{AgentOutput, AgentRunner, Mailbox, MultiError};
struct MockRunner;
#[async_trait::async_trait]
impl AgentRunner for MockRunner {
async fn run(
&self,
spec: &AgentSpec,
task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Ok(AgentOutput {
name: spec.name.clone(),
answer: format!("completed: {}", &task[..task.len().min(50)]),
turns: 1,
tool_calls: 0,
duration_ms: 10.0,
error: None,
outcome: None,
tokens: None,
tools_used: Vec::new(),
})
}
}
struct FailRunner;
#[async_trait::async_trait]
impl AgentRunner for FailRunner {
async fn run(
&self,
_spec: &AgentSpec,
_task: &str,
_runtime: &Runtime,
_mailbox: &Mailbox,
) -> Result<AgentOutput, MultiError> {
Err(MultiError::AgentFailed(
"flaky".to_string(),
"boom".to_string(),
))
}
}
#[tokio::test]
async fn test_run_once() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("test", "do something");
let execution = executor.run_once(&mut task).await;
assert_eq!(execution.status, TaskStatus::Completed);
assert!(execution.answer.contains("completed"));
assert_eq!(task.run_count, 1);
assert!(task.last_run_at.is_some());
}
#[tokio::test]
async fn test_interval_same_slot_dedups() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task =
Task::new("interval_test", "repeat this").with_trigger(TaskTrigger::Interval, "1h");
let a = executor.run_once(&mut task).await;
let b = executor.run_once(&mut task).await;
assert_eq!(a.execution_id, b.execution_id);
assert_eq!(task.run_count, 1, "same slot dedups to one execution");
assert_eq!(task.executions.len(), 1);
}
#[test]
fn test_occurrence_id_interval_slot_stable_across_now() {
let mut task = Task::new("t", "p").with_trigger(TaskTrigger::Interval, "60");
let anchor = task.created_at;
let a = occurrence_id(&task, None, anchor + chrono::Duration::seconds(10)).unwrap();
let b = occurrence_id(&task, None, anchor + chrono::Duration::seconds(50)).unwrap();
let c = occurrence_id(&task, None, anchor + chrono::Duration::seconds(70)).unwrap();
assert_eq!(a, b, "same slot ⇒ same id regardless of when in the slot");
assert_ne!(a, c, "next slot ⇒ new id");
assert!(a.ends_with(":int:0"));
assert!(c.ends_with(":int:1"));
task.trigger = TaskTrigger::Cron;
assert!(occurrence_id(&task, None, anchor)
.unwrap()
.contains(":cron:"));
}
#[test]
fn test_occurrence_id_once_is_stable() {
let task = Task::new("t", "p").with_trigger(TaskTrigger::Once, "");
let a = occurrence_id(&task, None, task.created_at).unwrap();
let b = occurrence_id(&task, None, task.created_at + chrono::Duration::days(3)).unwrap();
assert_eq!(a, b, "Once is a single occurrence regardless of wall time");
}
#[test]
fn test_occurrence_id_manual_is_none() {
let task = Task::new("t", "p"); assert!(occurrence_id(&task, None, task.created_at).is_none());
}
#[test]
fn test_occurrence_id_filewatch_keys_on_hash() {
let task = Task::new("t", "p").with_file_watch("/tmp/x");
let now = task.created_at;
let h1 = occurrence_id(&task, Some(0xABCD), now).unwrap();
let h1_again = occurrence_id(&task, Some(0xABCD), now).unwrap();
let h2 = occurrence_id(&task, Some(0x1234), now).unwrap();
assert_eq!(h1, h1_again, "same change ⇒ same id");
assert_ne!(h1, h2, "different change ⇒ different id");
}
#[tokio::test]
async fn test_once_does_not_refire() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("once_task", "do it once").with_trigger(TaskTrigger::Once, "");
let first = executor.run_once(&mut task).await;
let second = executor.run_once(&mut task).await;
assert_eq!(first.execution_id, second.execution_id);
assert_eq!(task.run_count, 1, "the second call is deduped, not re-run");
assert_eq!(task.executions.len(), 1);
}
#[tokio::test]
async fn test_failed_occurrence_can_retry() {
let runner: Arc<dyn AgentRunner> = Arc::new(FailRunner);
let executor = Executor::new(runner);
let mut task = Task::new("flaky", "try it").with_trigger(TaskTrigger::Once, "");
executor.run_once(&mut task).await;
executor.run_once(&mut task).await;
assert_eq!(task.run_count, 2);
assert!(task
.executions
.iter()
.all(|e| e.status == TaskStatus::Failed));
}
#[tokio::test]
async fn test_manual_runs_are_not_deduped() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("manual", "again and again");
let a = executor.run_once(&mut task).await;
let b = executor.run_once(&mut task).await;
assert_ne!(a.execution_id, b.execution_id, "manual repeats stay distinct");
assert_eq!(task.run_count, 2);
}
#[tokio::test]
async fn test_spawn_and_cancel() {
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let task = Task::new("bg_task", "background work").with_trigger(TaskTrigger::Interval, "0");
let handle = spawn_task(task, runner, None);
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
handle.cancel();
let executions = handle.join.await.unwrap();
assert!(!executions.is_empty());
}
#[tokio::test]
async fn test_file_watch() {
let dir = tempfile::TempDir::new().unwrap();
let watch_file = dir.path().join("watched.txt");
std::fs::write(&watch_file, "v1").unwrap();
let runner: Arc<dyn AgentRunner> = Arc::new(MockRunner);
let executor = Executor::new(runner);
let mut task = Task::new("watcher", "process file change")
.with_file_watch(watch_file.to_str().unwrap());
let (cancel_tx, cancel_rx) = watch::channel(false);
let watch_path = watch_file.clone();
let cancel_tx_clone = cancel_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
std::fs::write(&watch_path, "v2").unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await;
let _ = cancel_tx_clone.send(true);
});
let executions = executor.run_loop(&mut task, Some(1), cancel_rx).await;
assert_eq!(executions.len(), 1);
assert!(executions[0].answer.contains("completed"));
}
}