use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use serde_json::Value;
use tracing::{debug, info, warn};
use car_multi::{
AdversarialReview, AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm,
SwarmMode, Tournament, Vote,
};
use crate::error::WorkflowError;
use crate::result::*;
use crate::types::*;
type StepOutcome = (StageOutput, String, HashMap<String, Value>);
pub struct WorkflowEngine {
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
}
impl WorkflowEngine {
pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
Self { runner, infra }
}
pub fn run<'a>(
&'a self,
workflow: &'a Workflow,
) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
Box::pin(self.run_inner(workflow))
}
async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
if workflow.stage(&workflow.start).is_none() {
return Err(WorkflowError::NoStartStage);
}
for stage in &workflow.stages {
if crate::verify::exceeds_nesting(&stage.step, crate::verify::MAX_STEP_NESTING_DEPTH) {
return Err(WorkflowError::StageFailed(
stage.id.clone(),
format!(
"loop/foreach/sub-workflow nesting exceeds the limit of {}",
crate::verify::MAX_STEP_NESTING_DEPTH
),
));
}
}
let mut wf_state = HashMap::new();
if let Some(goal) = &workflow.goal {
wf_state.insert("goal".to_string(), Value::String(goal.clone()));
}
let cursor = Cursor {
wf_state,
stage_results: Vec::new(),
completed_stage_ids: Vec::new(),
iterations: 0,
prior_duration_ms: 0.0,
current_id: workflow.start.clone(),
};
self.drive(workflow, new_run_id(), cursor, &HashMap::new())
.await
}
pub async fn run_cached(
&self,
workflow: &Workflow,
prior: &WorkflowResult,
) -> Result<WorkflowResult, WorkflowError> {
if workflow.stage(&workflow.start).is_none() {
return Err(WorkflowError::NoStartStage);
}
let cache: HashMap<String, StageResult> = prior
.stages
.iter()
.filter(|s| s.status == StageStatus::Succeeded)
.map(|s| (s.stage_id.clone(), s.clone()))
.collect();
let mut wf_state = prior.final_state.clone();
let stage_ids: Vec<String> = workflow.stages.iter().map(|s| s.id.clone()).collect();
wf_state.retain(|k, _| {
match k.strip_prefix("stage.") {
Some(rest) => {
let id = rest.split('.').next().unwrap_or("");
cache.contains_key(id) || !stage_ids.iter().any(|s| s == id)
}
None => true,
}
});
if let Some(goal) = &workflow.goal {
wf_state.insert("goal".to_string(), Value::String(goal.clone()));
} else {
wf_state.remove("goal");
}
let cursor = Cursor {
wf_state,
stage_results: Vec::new(),
completed_stage_ids: Vec::new(),
iterations: 0,
prior_duration_ms: 0.0,
current_id: workflow.start.clone(),
};
self.drive(workflow, new_run_id(), cursor, &cache).await
}
pub async fn resume(
&self,
paused: PausedWorkflow,
input: HashMap<String, Value>,
) -> Result<WorkflowResult, WorkflowError> {
let PausedWorkflow {
run_id,
workflow,
paused_stage_id,
mut wf_state,
mut stage_results,
mut completed_stage_ids,
iterations,
prior_duration_ms,
..
} = paused;
let approval = match workflow.stage(&paused_stage_id).map(|s| &s.step) {
Some(StageStep::Approval(ap)) => ap.clone(),
Some(_) => {
return Err(WorkflowError::InvalidResume(format!(
"stage '{paused_stage_id}' is not an approval gate"
)))
}
None => {
return Err(WorkflowError::InvalidResume(format!(
"paused stage '{paused_stage_id}' not found in workflow"
)))
}
};
let output_key = approval.output_key.clone();
if output_key.trim().is_empty() {
return Err(WorkflowError::InvalidResume(format!(
"approval stage '{paused_stage_id}' has empty output_key"
)));
}
if output_key == "goal" {
return Err(WorkflowError::InvalidResume(
"approval output_key 'goal' is reserved (it is the drift anchor)".into(),
));
}
if let Some(goal) = &workflow.goal {
wf_state.insert("goal".to_string(), Value::String(goal.clone()));
} else {
wf_state.remove("goal");
}
validate_approval_input(&approval.fields, &input)?;
let response = Value::Object(input.into_iter().collect());
if let Value::Object(map) = &response {
for (k, v) in map {
wf_state.insert(format!("{output_key}.{k}"), v.clone());
}
}
wf_state.insert(output_key.clone(), response.clone());
wf_state.insert(
format!("stage.{paused_stage_id}.succeeded"),
Value::Bool(true),
);
wf_state.insert(
format!("stage.{paused_stage_id}.answer"),
Value::String(response.to_string()),
);
let stage_name = workflow
.stage(&paused_stage_id)
.map(|s| s.name.clone())
.unwrap_or_else(|| paused_stage_id.clone());
stage_results.push(StageResult {
stage_id: paused_stage_id.clone(),
stage_name,
status: StageStatus::Succeeded,
output: StageOutput::Approval { response },
duration_ms: 0.0,
error: None,
});
completed_stage_ids.push(paused_stage_id.clone());
info!(stage = %paused_stage_id, run_id = %run_id, "approval resumed");
match next_stage(&workflow, &paused_stage_id, &wf_state) {
Some(next_id) => {
let cursor = Cursor {
wf_state,
stage_results,
completed_stage_ids,
iterations,
prior_duration_ms,
current_id: next_id,
};
self.drive(&workflow, run_id, cursor, &HashMap::new()).await
}
None => {
info!(workflow = %workflow.name, "workflow completed (terminal approval gate)");
Ok(completed_result(
&workflow,
stage_results,
wf_state,
prior_duration_ms,
))
}
}
}
async fn drive(
&self,
workflow: &Workflow,
run_id: String,
mut cur: Cursor,
cache: &HashMap<String, StageResult>,
) -> Result<WorkflowResult, WorkflowError> {
let start = Instant::now();
loop {
cur.iterations += 1;
if cur.iterations > workflow.max_iterations {
return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
}
let stage = workflow
.stage(&cur.current_id)
.ok_or_else(|| WorkflowError::StageNotFound(cur.current_id.clone()))?;
if let Some(cached) = cache.get(&cur.current_id) {
debug!(stage = %cur.current_id, "replaying cached stage");
cur.wf_state
.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
cur.stage_results.push(cached.clone());
cur.completed_stage_ids.push(stage.id.clone());
match next_stage(workflow, &cur.current_id, &cur.wf_state) {
Some(next_id) => {
cur.current_id = next_id;
continue;
}
None => {
let duration =
cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
return Ok(completed_result(
workflow,
cur.stage_results,
cur.wf_state,
duration,
));
}
}
}
if let StageStep::Approval(ap) = &stage.step {
if ap.output_key.trim().is_empty() {
return Err(WorkflowError::StageFailed(
stage.id.clone(),
"approval stage has empty output_key".into(),
));
}
info!(stage = %stage.id, run_id = %run_id, "workflow paused at approval gate");
let elapsed = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
let now = chrono::Utc::now();
let paused = PausedWorkflow {
run_id: run_id.clone(),
workflow: workflow.clone(),
paused_stage_id: stage.id.clone(),
prompt: ap.prompt.clone(),
fields: ap.fields.clone(),
output_key: ap.output_key.clone(),
wf_state: cur.wf_state.clone(),
stage_results: cur.stage_results.clone(),
completed_stage_ids: cur.completed_stage_ids.clone(),
iterations: cur.iterations,
prior_duration_ms: elapsed,
created_at: now,
};
return Ok(WorkflowResult {
workflow_id: workflow.id.clone(),
workflow_name: workflow.name.clone(),
status: WorkflowStatus::Paused,
stages: cur.stage_results,
compensations: vec![],
duration_ms: elapsed,
timestamp: now,
final_state: cur.wf_state,
paused: Some(paused),
});
}
debug!(stage = %stage.id, name = %stage.name, iteration = cur.iterations, "executing stage");
let stage_start = Instant::now();
let result = if let Some(timeout_ms) = stage.timeout_ms {
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
self.execute_step(&stage.id, &stage.step, &cur.wf_state),
)
.await
{
Ok(r) => r,
Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
}
} else {
self.execute_step(&stage.id, &stage.step, &cur.wf_state).await
};
let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
match result {
Ok((output, answer, deltas)) => {
for (k, v) in deltas {
cur.wf_state.insert(k, v);
}
if let Some(goal) = &workflow.goal {
cur.wf_state
.insert("goal".to_string(), Value::String(goal.clone()));
}
cur.wf_state
.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
cur.wf_state
.insert(format!("stage.{}.answer", stage.id), Value::String(answer));
cur.stage_results.push(StageResult {
stage_id: stage.id.clone(),
stage_name: stage.name.clone(),
status: StageStatus::Succeeded,
output,
duration_ms: stage_duration,
error: None,
});
cur.completed_stage_ids.push(stage.id.clone());
info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
}
Err(e) => {
let error_msg = e.to_string();
cur.wf_state
.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(false));
cur.wf_state.insert(
format!("stage.{}.error", stage.id),
Value::String(error_msg.clone()),
);
cur.stage_results.push(StageResult {
stage_id: stage.id.clone(),
stage_name: stage.name.clone(),
status: StageStatus::Failed,
output: StageOutput::Empty,
duration_ms: stage_duration,
error: Some(error_msg.clone()),
});
warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
let compensations = self.compensate(workflow, &cur.completed_stage_ids).await;
let all_compensated = compensations
.iter()
.all(|c| c.status == StageStatus::Succeeded);
let any_compensated = !compensations.is_empty();
let status = if any_compensated && all_compensated {
WorkflowStatus::Compensated
} else if any_compensated {
WorkflowStatus::PartiallyCompensated
} else {
WorkflowStatus::Failed
};
return Ok(WorkflowResult {
workflow_id: workflow.id.clone(),
workflow_name: workflow.name.clone(),
status,
stages: cur.stage_results,
compensations,
duration_ms: cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0,
timestamp: chrono::Utc::now(),
final_state: cur.wf_state,
paused: None,
});
}
}
match next_stage(workflow, &cur.current_id, &cur.wf_state) {
Some(next_id) => {
debug!(from = %cur.current_id, to = %next_id, "taking edge");
cur.current_id = next_id;
}
None => {
info!(
workflow = %workflow.name,
stages_executed = cur.stage_results.len(),
"workflow completed"
);
let duration = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
return Ok(completed_result(
workflow,
cur.stage_results,
cur.wf_state,
duration,
));
}
}
}
}
fn execute_step<'a>(
&'a self,
stage_id: &'a str,
step: &'a StageStep,
wf_state: &'a HashMap<String, Value>,
) -> futures::future::BoxFuture<'a, Result<StepOutcome, WorkflowError>> {
Box::pin(async move {
match step {
StageStep::Pattern(ps) => {
let (out, answer) = self.execute_pattern(ps, wf_state).await?;
let mut deltas = HashMap::new();
if let StageOutput::Review {
passed,
blocker_count,
..
} = &out
{
deltas.insert(
format!("stage.{stage_id}.review_passed"),
Value::Bool(*passed),
);
deltas.insert(
format!("stage.{stage_id}.review_blockers"),
Value::from(*blocker_count),
);
}
Ok((out, answer, deltas))
}
StageStep::Proposal(ps) => {
let (out, answer) = self.execute_proposal(ps).await?;
let mut deltas = HashMap::new();
if let StageOutput::Proposal { ref result } = out {
for ar in &result.results {
for (k, v) in &ar.state_changes {
deltas.insert(k.clone(), v.clone());
}
}
}
Ok((out, answer, deltas))
}
StageStep::SubWorkflow(sw) => {
let (out, answer) = self.execute_sub_workflow(sw).await?;
Ok((out, answer, HashMap::new()))
}
StageStep::LoopUntil(ls) => self.execute_loop_until(stage_id, ls, wf_state).await,
StageStep::ForEach(fe) => self.execute_for_each(stage_id, fe, wf_state).await,
StageStep::Approval(_) => Err(WorkflowError::StageFailed(
"approval".into(),
"an approval gate cannot be executed as a step, loop/foreach body, or compensation".into(),
)),
}
})
}
async fn execute_loop_until(
&self,
stage_id: &str,
ls: &LoopUntilStep,
wf_state: &HashMap<String, Value>,
) -> Result<StepOutcome, WorkflowError> {
if ls.max_iterations == 0 {
return Err(WorkflowError::StageFailed(
stage_id.to_string(),
"loop_until requires max_iterations >= 1".into(),
));
}
let body_id = format!("{stage_id}.body");
let mut working = wf_state.clone();
let mut accumulated: HashMap<String, Value> = HashMap::new();
let mut outputs: Vec<Box<StageOutput>> = Vec::new();
let mut last_answer = String::new();
let mut satisfied = false;
let mut ran: u32 = 0;
for _ in 0..ls.max_iterations {
let (out, answer, deltas) = self
.execute_step(&body_id, &ls.body, &working)
.await?;
ran += 1;
for (k, v) in deltas {
working.insert(k.clone(), v.clone());
accumulated.insert(k, v);
}
let answer_val = Value::String(answer.clone());
let iter_val = Value::from(ran);
working.insert(format!("stage.{stage_id}.answer"), answer_val.clone());
working.insert(format!("stage.{stage_id}.iteration"), iter_val.clone());
accumulated.insert(format!("stage.{stage_id}.answer"), answer_val);
accumulated.insert(format!("stage.{stage_id}.iteration"), iter_val);
last_answer = answer;
outputs.push(Box::new(out));
if !ls.until.is_empty() && check_conditions(&ls.until, &working) {
satisfied = true;
break;
}
}
Ok((
StageOutput::Loop {
iterations: ran,
satisfied,
iterations_output: outputs,
},
last_answer,
accumulated,
))
}
async fn execute_for_each(
&self,
stage_id: &str,
fe: &ForEachStep,
wf_state: &HashMap<String, Value>,
) -> Result<StepOutcome, WorkflowError> {
let items: Vec<Value> = wf_state
.get(&fe.items_from)
.and_then(|v| v.as_array().cloned())
.unwrap_or_default();
let item_strs: Vec<String> = items.iter().map(render_item).collect();
let mut templated: Vec<StageStep> = Vec::with_capacity(items.len());
for (i, item) in item_strs.iter().enumerate() {
templated.push(template_step(&fe.body, item, i)?);
}
let concurrency = fe.max_concurrent.max(1);
let mut futs = Vec::with_capacity(templated.len());
for (i, body) in templated.iter().enumerate() {
futs.push(self.run_foreach_body(stage_id, i, body, wf_state));
}
let results: Vec<Result<StepOutcome, WorkflowError>> = if concurrency <= 1 {
let mut acc = Vec::with_capacity(futs.len());
for f in futs {
acc.push(f.await);
}
acc
} else {
use futures::stream::StreamExt;
futures::stream::iter(futs)
.buffered(concurrency)
.collect()
.await
};
let mut outputs: Vec<Box<StageOutput>> = Vec::with_capacity(results.len());
let mut deltas: HashMap<String, Value> = HashMap::new();
for (i, r) in results.into_iter().enumerate() {
let (out, answer, body_deltas) = r?;
deltas.insert(
format!("foreach.{stage_id}.{i}.item"),
Value::String(item_strs[i].clone()),
);
deltas.insert(
format!("foreach.{stage_id}.{i}.answer"),
Value::String(answer),
);
for (k, v) in body_deltas {
deltas.insert(format!("foreach.{stage_id}.{i}.state.{k}"), v);
}
outputs.push(Box::new(out));
}
deltas.insert(
format!("foreach.{stage_id}.count"),
Value::from(item_strs.len()),
);
let answer = format!("{} item(s) processed", item_strs.len());
Ok((
StageOutput::ForEach {
items: item_strs,
outputs,
},
answer,
deltas,
))
}
async fn execute_pattern(
&self,
step: &PatternStep,
wf_state: &HashMap<String, Value>,
) -> Result<(StageOutput, String), WorkflowError> {
let anchored_task = match wf_state.get("goal").and_then(|g| g.as_str()) {
Some(goal) if !goal.trim().is_empty() => {
format!("Overall goal: {goal}\n\nCurrent step: {}", step.task)
}
_ => step.task.clone(),
};
let task = anchored_task.as_str();
let runner = &self.runner;
let infra = &self.infra;
match step.pattern {
PatternKind::SwarmParallel => {
let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
swarm = swarm.with_synthesizer(synth);
}
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: r.outputs,
final_answer: r.final_summary.clone(),
},
r.final_summary,
))
}
PatternKind::SwarmSequential => {
let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: r.outputs,
final_answer: r.final_summary.clone(),
},
r.final_summary,
))
}
PatternKind::SwarmDebate => {
let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: r.outputs,
final_answer: r.final_summary.clone(),
},
r.final_summary,
))
}
PatternKind::Pipeline => {
let pipeline = Pipeline::new(step.agents.clone());
let r = pipeline.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: r.stages,
final_answer: r.final_answer.clone(),
},
r.final_answer,
))
}
PatternKind::Supervisor => {
let max_rounds = step
.config
.get("max_rounds")
.and_then(|v| v.as_u64())
.unwrap_or(3) as u32;
let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
let r = Supervisor::new(workers, supervisor)
.with_max_rounds(max_rounds)
.run(task, runner, infra)
.await?;
let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
Ok((
StageOutput::Pattern {
outputs: all_outputs,
final_answer: r.final_answer.clone(),
},
r.final_answer,
))
}
PatternKind::Delegator => {
let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
let delegator = car_multi::Delegator::new(main_agent, specialists);
let r = delegator.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: vec![car_multi::AgentOutput {
name: "delegator".into(),
answer: r.final_answer.clone(),
turns: 0,
tool_calls: r.delegations.len() as u32,
duration_ms: 0.0,
error: None,
outcome: None,
tokens: None,
}],
final_answer: r.final_answer.clone(),
},
r.final_answer,
))
}
PatternKind::MapReduce => {
let max_concurrent = step
.config
.get("max_concurrent")
.and_then(|v| v.as_u64())
.unwrap_or(5) as usize;
let items: Vec<String> = step
.config
.get("items")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if step.agents.len() < 2 {
return Err(WorkflowError::StageFailed(
"map_reduce".into(),
"requires at least 2 agents (mapper + reducer)".into(),
));
}
let mapper = step.agents[0].clone();
let reducer = step.agents[1].clone();
let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
let r = mr
.run(
task,
&items
.iter()
.map(|s| s.as_str())
.collect::<Vec<_>>()
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>(),
runner,
infra,
)
.await?;
Ok((
StageOutput::Pattern {
outputs: r.map_outputs,
final_answer: r.reduced_answer.clone(),
},
r.reduced_answer,
))
}
PatternKind::Vote => {
let mut vote = Vote::new(step.agents.clone());
if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
vote = vote.with_synthesizer(synth);
}
let r = vote.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: r.votes,
final_answer: r.winner.clone(),
},
r.winner,
))
}
PatternKind::Fleet => {
let mut fleet = Fleet::new(step.agents.clone());
if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
fleet = fleet.with_timeout(timeout);
}
let r = fleet.run(runner, infra).await?;
let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
Ok((
StageOutput::Pattern {
outputs: r.outputs,
final_answer: summary.clone(),
},
summary,
))
}
PatternKind::AdversarialReview => {
if step.agents.is_empty() {
return Err(WorkflowError::StageFailed(
"adversarial_review".into(),
"requires a reviewer agent (agents[0])".into(),
));
}
let reviewer = step.agents[0].clone();
let criteria: Vec<String> = step
.config
.get("criteria")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let work = step
.config
.get("review_key")
.and_then(|v| v.as_str())
.and_then(|key| wf_state.get(key))
.map(|v| match v {
Value::String(s) => s.clone(),
other => other.to_string(),
})
.unwrap_or_default();
if work.trim().is_empty() {
let answer = "FAIL (no work to review — missing or empty review_key)".to_string();
return Ok((
StageOutput::Review {
passed: false,
blocker_count: 1,
findings: vec![],
reviewer: car_multi::AgentOutput {
name: "reviewer".into(),
answer: answer.clone(),
turns: 0,
tool_calls: 0,
duration_ms: 0.0,
error: None,
outcome: None,
tokens: None,
},
},
answer,
));
}
let r = AdversarialReview::new(reviewer, criteria)
.run(&work, runner, infra)
.await?;
let answer = if r.passed {
format!("PASS ({} finding(s))", r.findings.len())
} else {
format!("FAIL ({} blocker(s))", r.blocker_count)
};
Ok((
StageOutput::Review {
passed: r.passed,
blocker_count: r.blocker_count,
findings: r.findings,
reviewer: r.reviewer_output,
},
answer,
))
}
PatternKind::Tournament => {
if step.agents.len() < 3 {
return Err(WorkflowError::StageFailed(
"tournament".into(),
"requires at least 3 agents (>=2 competitors + 1 judge)".into(),
));
}
let judge_idx = step
.config
.get("judge_index")
.and_then(|v| v.as_u64())
.unwrap_or(step.agents.len() as u64 - 1) as usize;
let judge = step
.agents
.get(judge_idx)
.cloned()
.unwrap_or_else(|| step.agents.last().unwrap().clone());
let competitors: Vec<_> = step
.agents
.iter()
.enumerate()
.filter(|(i, _)| *i != judge_idx)
.map(|(_, a)| a.clone())
.collect();
let r = Tournament::new(competitors, judge)
.run(task, runner, infra)
.await?;
Ok((
StageOutput::Pattern {
outputs: r.candidates,
final_answer: r.winner_answer.clone(),
},
r.winner_answer,
))
}
}
}
async fn run_foreach_body(
&self,
stage_id: &str,
index: usize,
body: &StageStep,
wf_state: &HashMap<String, Value>,
) -> Result<StepOutcome, WorkflowError> {
let child_id = format!("{stage_id}.{index}");
self.execute_step(&child_id, body, wf_state).await
}
async fn execute_proposal(
&self,
step: &ProposalStep,
) -> Result<(StageOutput, String), WorkflowError> {
let runtime = self.infra.make_runtime();
let result = runtime.execute(&step.proposal).await;
if result.all_succeeded() {
let answer = result
.results
.last()
.and_then(|r| r.output.as_ref())
.map(|v| v.to_string())
.unwrap_or_default();
Ok((StageOutput::Proposal { result }, answer))
} else {
let errors: Vec<String> = result
.results
.iter()
.filter_map(|r| r.error.as_ref())
.cloned()
.collect();
Err(WorkflowError::StageFailed(
"proposal".into(),
errors.join("; "),
))
}
}
async fn execute_sub_workflow(
&self,
step: &SubWorkflowStep,
) -> Result<(StageOutput, String), WorkflowError> {
let result = self.run(&step.workflow).await?;
if result.is_paused() {
return Err(WorkflowError::ApprovalInSubWorkflow(
step.workflow.id.clone(),
));
}
let answer = result
.stages
.last()
.and_then(|s| match &s.output {
StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
StageOutput::Proposal { result } => result
.results
.last()
.and_then(|r| r.output.as_ref())
.map(|v| v.to_string()),
StageOutput::SubWorkflow { result } => Some(format!(
"sub-workflow {} {}",
result.workflow_name,
if result.succeeded() {
"completed"
} else {
"failed"
}
)),
StageOutput::Approval { response } => Some(response.to_string()),
StageOutput::Review {
passed,
blocker_count,
..
} => Some(format!(
"review {}",
if *passed {
"passed".to_string()
} else {
format!("failed ({blocker_count} blocker(s))")
}
)),
StageOutput::Loop {
iterations,
satisfied,
..
} => Some(format!(
"loop ran {} iteration(s), until {}satisfied",
iterations,
if *satisfied { "" } else { "not " }
)),
StageOutput::ForEach { items, .. } => {
Some(format!("foreach over {} item(s)", items.len()))
}
StageOutput::Empty => None,
})
.unwrap_or_default();
if result.succeeded() {
Ok((
StageOutput::SubWorkflow {
result: Box::new(result),
},
answer,
))
} else {
Err(WorkflowError::StageFailed(
"sub_workflow".into(),
"sub-workflow failed".into(),
))
}
}
async fn compensate(
&self,
workflow: &Workflow,
completed_stage_ids: &[String],
) -> Vec<CompensationResult> {
let mut results = Vec::new();
for stage_id in completed_stage_ids.iter().rev() {
let stage = match workflow.stage(stage_id) {
Some(s) => s,
None => continue,
};
let handler = match &stage.compensation {
Some(h) => h,
None => continue,
};
debug!(stage = %stage_id, "running compensation");
let comp_start = Instant::now();
let comp_result = match handler {
CompensationHandler::Proposal(ps) => self.execute_proposal(ps).await,
CompensationHandler::StageRef { stage_id: ref_id } => {
if let Some(ref_stage) = workflow.stage(ref_id) {
self.execute_step(ref_id, &ref_stage.step, &HashMap::new())
.await
.map(|(out, answer, _deltas)| (out, answer))
} else {
Err(WorkflowError::StageNotFound(ref_id.clone()))
}
}
};
let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
match comp_result {
Ok(_) => {
results.push(CompensationResult {
for_stage_id: stage_id.clone(),
status: StageStatus::Succeeded,
duration_ms: duration,
error: None,
});
}
Err(e) => {
warn!(stage = %stage_id, error = %e, "compensation failed");
results.push(CompensationResult {
for_stage_id: stage_id.clone(),
status: StageStatus::Failed,
duration_ms: duration,
error: Some(e.to_string()),
});
}
}
}
results
}
}
struct Cursor {
wf_state: HashMap<String, Value>,
stage_results: Vec<StageResult>,
completed_stage_ids: Vec<String>,
iterations: u32,
prior_duration_ms: f64,
current_id: String,
}
fn new_run_id() -> String {
uuid::Uuid::new_v4().simple().to_string()
}
fn validate_approval_input(
fields: &[ApprovalField],
input: &HashMap<String, Value>,
) -> Result<(), WorkflowError> {
for field in fields {
match input.get(&field.name) {
None | Some(Value::Null) => {
if field.required {
return Err(WorkflowError::InvalidApprovalInput(format!(
"required field '{}' is missing",
field.name
)));
}
}
Some(value) => {
if field.field_type == "options" && !field.options.is_empty() {
let ok = value
.as_str()
.map(|s| field.options.iter().any(|o| o == s))
.unwrap_or(false);
if !ok {
return Err(WorkflowError::InvalidApprovalInput(format!(
"field '{}' value {} is not one of {:?}",
field.name, value, field.options
)));
}
}
}
}
}
Ok(())
}
fn next_stage(workflow: &Workflow, from: &str, state: &HashMap<String, Value>) -> Option<String> {
workflow
.outgoing_edges(from)
.iter()
.find(|e| check_conditions(&e.conditions, state))
.map(|e| e.to.clone())
}
fn completed_result(
workflow: &Workflow,
stage_results: Vec<StageResult>,
final_state: HashMap<String, Value>,
duration_ms: f64,
) -> WorkflowResult {
WorkflowResult {
workflow_id: workflow.id.clone(),
workflow_name: workflow.name.clone(),
status: WorkflowStatus::Completed,
stages: stage_results,
compensations: vec![],
duration_ms,
timestamp: chrono::Utc::now(),
final_state,
paused: None,
}
}
fn render_item(item: &Value) -> String {
match item {
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn template_step(body: &StageStep, item: &str, index: usize) -> Result<StageStep, WorkflowError> {
let mut v = serde_json::to_value(body)
.map_err(|e| WorkflowError::StageFailed("foreach".into(), format!("serialize body: {e}")))?;
substitute_in_value(&mut v, item, index);
serde_json::from_value(v).map_err(|e| {
WorkflowError::StageFailed("foreach".into(), format!("rebuild templated body: {e}"))
})
}
fn substitute_in_value(v: &mut Value, item: &str, index: usize) {
match v {
Value::String(s) => {
if s.contains("{{item}}") || s.contains("{{index}}") {
*s = s
.replace("{{item}}", item)
.replace("{{index}}", &index.to_string());
}
}
Value::Array(a) => a.iter_mut().for_each(|e| substitute_in_value(e, item, index)),
Value::Object(m) => m
.values_mut()
.for_each(|e| substitute_in_value(e, item, index)),
_ => {}
}
}
fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
conditions
.iter()
.all(|cond| evaluate_precondition(cond, state))
}
fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
let op = cond.operator.as_str();
match op {
"exists" => state.contains_key(&cond.key),
"not_exists" => !state.contains_key(&cond.key),
_ => {
let actual = match state.get(&cond.key) {
Some(v) => v,
None => return false, };
match op {
"eq" => actual == &cond.value,
"neq" => actual != &cond.value,
"gt" => compare_values(actual, &cond.value)
.map_or(false, |o| o == std::cmp::Ordering::Greater),
"gte" => compare_values(actual, &cond.value)
.map_or(false, |o| o != std::cmp::Ordering::Less),
"lt" => compare_values(actual, &cond.value)
.map_or(false, |o| o == std::cmp::Ordering::Less),
"lte" => compare_values(actual, &cond.value)
.map_or(false, |o| o != std::cmp::Ordering::Greater),
"contains" => {
if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
haystack.contains(needle)
} else {
false
}
}
_ => false,
}
}
}
}
fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
match (a.as_f64(), b.as_f64()) {
(Some(a), Some(b)) => a.partial_cmp(&b),
_ => match (a.as_str(), b.as_str()) {
(Some(a), Some(b)) => Some(a.cmp(b)),
_ => None,
},
}
}
fn extract_synthesizer(
config: &HashMap<String, Value>,
agents: &[car_multi::AgentSpec],
) -> Option<car_multi::AgentSpec> {
config
.get("synthesizer_index")
.and_then(|v| v.as_u64())
.and_then(|i| agents.get(i as usize))
.cloned()
}
fn split_supervisor_workers(
agents: &[car_multi::AgentSpec],
config: &HashMap<String, Value>,
) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
let idx = config
.get("supervisor_index")
.and_then(|v| v.as_u64())
.unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
let supervisor = agents
.get(idx)
.cloned()
.unwrap_or_else(|| agents.last().unwrap().clone());
let workers: Vec<_> = agents
.iter()
.enumerate()
.filter(|(i, _)| *i != idx)
.map(|(_, a)| a.clone())
.collect();
(supervisor, workers)
}
fn split_delegator(
agents: &[car_multi::AgentSpec],
_config: &HashMap<String, Value>,
) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
let main = agents
.first()
.cloned()
.unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
let specialists: HashMap<String, car_multi::AgentSpec> = agents
.iter()
.skip(1)
.map(|a| (a.name.clone(), a.clone()))
.collect();
(main, specialists)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn precondition_eq() {
let mut state = HashMap::new();
state.insert("x".into(), Value::Bool(true));
let cond = car_ir::Precondition {
key: "x".into(),
operator: "eq".into(),
value: Value::Bool(true),
description: String::new(),
};
assert!(evaluate_precondition(&cond, &state));
let cond_false = car_ir::Precondition {
key: "x".into(),
operator: "eq".into(),
value: Value::Bool(false),
description: String::new(),
};
assert!(!evaluate_precondition(&cond_false, &state));
}
#[test]
fn precondition_exists() {
let mut state = HashMap::new();
state.insert("x".into(), Value::Null);
let exists = car_ir::Precondition {
key: "x".into(),
operator: "exists".into(),
value: Value::Null,
description: String::new(),
};
assert!(evaluate_precondition(&exists, &state));
let not_exists = car_ir::Precondition {
key: "y".into(),
operator: "exists".into(),
value: Value::Null,
description: String::new(),
};
assert!(!evaluate_precondition(¬_exists, &state));
}
#[test]
fn precondition_numeric_comparison() {
let mut state = HashMap::new();
state.insert("count".into(), serde_json::json!(5));
let gt = car_ir::Precondition {
key: "count".into(),
operator: "gt".into(),
value: serde_json::json!(3),
description: String::new(),
};
assert!(evaluate_precondition(>, &state));
let lt = car_ir::Precondition {
key: "count".into(),
operator: "lt".into(),
value: serde_json::json!(3),
description: String::new(),
};
assert!(!evaluate_precondition(<, &state));
}
#[test]
fn empty_conditions_always_pass() {
let state = HashMap::new();
assert!(check_conditions(&[], &state));
}
use car_ir::ActionProposal;
struct NoopRunner;
#[async_trait::async_trait]
impl car_multi::AgentRunner for NoopRunner {
async fn run(
&self,
_spec: &car_multi::AgentSpec,
_task: &str,
_runtime: &car_engine::Runtime,
_mailbox: &car_multi::Mailbox,
) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
Err(car_multi::MultiError::NoOutput)
}
}
fn test_engine() -> WorkflowEngine {
WorkflowEngine::new(Arc::new(NoopRunner), car_multi::SharedInfra::new())
}
fn approval_stage(id: &str, output_key: &str) -> Stage {
Stage {
id: id.into(),
name: id.into(),
step: StageStep::Approval(ApprovalStep {
prompt: "approve?".into(),
fields: vec![],
output_key: output_key.into(),
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn empty_proposal_stage(id: &str) -> Stage {
Stage {
id: id.into(),
name: id.into(),
step: StageStep::Proposal(ProposalStep {
proposal: ActionProposal {
id: format!("p-{id}"),
source: "test".into(),
actions: vec![],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
},
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn edge(from: &str, to: &str, conditions: Vec<car_ir::Precondition>) -> Edge {
Edge {
from: from.into(),
to: to.into(),
conditions,
label: String::new(),
}
}
#[tokio::test]
async fn pauses_at_approval_gate_without_executing_it() {
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "gate".into(),
goal: None,
stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
edges: vec![edge("gate", "done", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let res = test_engine().run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Paused);
assert!(res.is_paused());
assert!(res.stages.is_empty(), "gate body must not run before resume");
let paused = res.paused.expect("checkpoint present when paused");
assert_eq!(paused.paused_stage_id, "gate");
assert_eq!(paused.output_key, "approval");
assert_eq!(paused.prompt, "approve?");
assert!(!paused.run_id.is_empty());
}
#[tokio::test]
async fn resume_records_response_and_completes() {
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "gate".into(),
goal: None,
stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
edges: vec![edge("gate", "done", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let eng = test_engine();
let paused = eng.run(&wf).await.unwrap().paused.unwrap();
let mut input = HashMap::new();
input.insert("decision".to_string(), Value::String("approve".into()));
let res = eng.resume(paused, input).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert!(res.succeeded());
let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
assert_eq!(ran, vec!["gate", "done"]);
assert!(matches!(res.stages[0].output, StageOutput::Approval { .. }));
assert_eq!(
res.final_state.get("approval.decision"),
Some(&Value::String("approve".into()))
);
assert!(res.final_state.contains_key("stage.gate.answer"));
assert!(res.final_state.contains_key("approval"));
}
#[tokio::test]
async fn resume_branches_on_answer_after_checkpoint_roundtrip() {
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "gate".into(),
goal: None,
stages: vec![
approval_stage("gate", "approval"),
empty_proposal_stage("approved"),
empty_proposal_stage("revise"),
],
edges: vec![
edge(
"gate",
"approved",
vec![car_ir::Precondition {
key: "approval.decision".into(),
operator: "eq".into(),
value: Value::String("approve".into()),
description: String::new(),
}],
),
edge("gate", "revise", vec![]),
],
max_iterations: 100,
metadata: HashMap::new(),
};
let eng = test_engine();
let paused = eng.run(&wf).await.unwrap().paused.unwrap();
let dir = std::env::temp_dir().join(format!(
"car-wf-resume-{}",
uuid::Uuid::new_v4().simple()
));
let store = crate::CheckpointStore::open(&dir).unwrap();
store.save(&paused).unwrap();
let run_id = paused.run_id.clone();
let reloaded = store.load(&run_id).unwrap().expect("checkpoint reloads");
let mut input = HashMap::new();
input.insert("decision".to_string(), Value::String("approve".into()));
let res = eng.resume(reloaded, input).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
assert!(ran.contains(&"approved"), "approve answer routes to approved");
assert!(!ran.contains(&"revise"), "revise branch must be skipped");
store.remove(&run_id).unwrap();
let _ = std::fs::remove_dir_all(&dir);
}
#[tokio::test]
async fn iteration_guard_counts_each_stage_once_across_pause() {
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "a".into(),
goal: None,
stages: vec![
empty_proposal_stage("a"),
approval_stage("gate", "approval"),
empty_proposal_stage("done"),
],
edges: vec![edge("a", "gate", vec![]), edge("gate", "done", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let eng = test_engine();
let paused = eng.run(&wf).await.unwrap().paused.unwrap();
assert_eq!(paused.iterations, 2);
let mut input = HashMap::new();
input.insert("decision".to_string(), Value::String("ok".into()));
let res = eng.resume(paused, input).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
assert_eq!(ran, vec!["a", "gate", "done"]);
}
#[tokio::test]
async fn resume_rejects_missing_required_field() {
let mut gate = approval_stage("gate", "approval");
if let StageStep::Approval(ap) = &mut gate.step {
ap.fields = vec![ApprovalField {
name: "decision".into(),
label: "Decision".into(),
field_type: "text".into(),
options: vec![],
required: true,
}];
}
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "gate".into(),
goal: None,
stages: vec![gate, empty_proposal_stage("done")],
edges: vec![edge("gate", "done", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let eng = test_engine();
let paused = eng.run(&wf).await.unwrap().paused.unwrap();
let err = eng.resume(paused, HashMap::new()).await.unwrap_err();
assert!(matches!(err, WorkflowError::InvalidApprovalInput(_)));
}
#[tokio::test]
async fn resume_rejects_checkpoint_pointing_at_non_approval_stage() {
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "work".into(),
goal: None,
stages: vec![empty_proposal_stage("work")],
edges: vec![],
max_iterations: 100,
metadata: HashMap::new(),
};
let forged = PausedWorkflow {
run_id: "forged".into(),
workflow: wf,
paused_stage_id: "work".into(),
prompt: String::new(),
fields: vec![],
output_key: "x".into(),
wf_state: HashMap::new(),
stage_results: vec![],
completed_stage_ids: vec![],
iterations: 1,
prior_duration_ms: 0.0,
created_at: chrono::Utc::now(),
};
let eng = test_engine();
let err = eng.resume(forged, HashMap::new()).await.unwrap_err();
assert!(matches!(err, WorkflowError::InvalidResume(_)));
}
fn state_write_action(key: &str, value: Value) -> car_ir::Action {
car_ir::Action {
id: format!("sw-{key}"),
action_type: car_ir::ActionType::StateWrite,
tool: None,
parameters: [
("key".to_string(), Value::from(key)),
("value".to_string(), value),
]
.into(),
preconditions: vec![],
expected_effects: HashMap::new(),
state_dependencies: vec![],
idempotent: false,
max_retries: 0,
failure_behavior: car_ir::FailureBehavior::Abort,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn proposal_step_writing(key: &str, value: Value) -> StageStep {
StageStep::Proposal(ProposalStep {
proposal: ActionProposal {
id: format!("p-{key}"),
source: "test".into(),
actions: vec![state_write_action(key, value)],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
},
})
}
fn single_stage_wf(id: &str, step: StageStep) -> Workflow {
Workflow {
id: "wf".into(),
name: "WF".into(),
start: id.into(),
goal: None,
stages: vec![Stage {
id: id.into(),
name: id.into(),
step,
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
}],
edges: vec![],
max_iterations: 100,
metadata: HashMap::new(),
}
}
#[tokio::test]
async fn loop_until_empty_predicate_runs_to_cap() {
let step = StageStep::LoopUntil(LoopUntilStep {
body: Box::new(empty_proposal_stage("ignored").step),
until: vec![],
max_iterations: 3,
});
let wf = single_stage_wf("loop", step);
let res = test_engine().run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
match &res.stages[0].output {
StageOutput::Loop {
iterations,
satisfied,
iterations_output,
} => {
assert_eq!(*iterations, 3);
assert!(!*satisfied);
assert_eq!(iterations_output.len(), 3);
}
other => panic!("expected Loop output, got {other:?}"),
}
assert_eq!(
res.final_state.get("stage.loop.iteration"),
Some(&Value::from(3u32))
);
}
#[tokio::test]
async fn loop_until_stops_when_predicate_satisfied() {
let step = StageStep::LoopUntil(LoopUntilStep {
body: Box::new(proposal_step_writing("done", Value::Bool(true))),
until: vec![car_ir::Precondition {
key: "done".into(),
operator: "eq".into(),
value: Value::Bool(true),
description: String::new(),
}],
max_iterations: 10,
});
let wf = single_stage_wf("loop", step);
let res = test_engine().run(&wf).await.unwrap();
match &res.stages[0].output {
StageOutput::Loop {
iterations,
satisfied,
..
} => {
assert_eq!(*iterations, 1, "predicate true after first body run");
assert!(*satisfied);
}
other => panic!("expected Loop output, got {other:?}"),
}
assert_eq!(res.final_state.get("done"), Some(&Value::Bool(true)));
}
#[tokio::test]
async fn for_each_runs_body_per_runtime_item() {
let seed = Stage {
id: "seed".into(),
name: "seed".into(),
step: proposal_step_writing(
"files",
serde_json::json!(["a.rs", "b.rs", "c.rs"]),
),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let fan = Stage {
id: "fan".into(),
name: "fan".into(),
step: StageStep::ForEach(ForEachStep {
items_from: "files".into(),
body: Box::new(proposal_step_writing(
"seen_{{index}}",
Value::String("{{item}}".into()),
)),
max_concurrent: 2,
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "seed".into(),
goal: None,
stages: vec![seed, fan],
edges: vec![edge("seed", "fan", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let res = test_engine().run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
let fan_out = &res.stages[1].output;
match fan_out {
StageOutput::ForEach { items, outputs } => {
assert_eq!(items, &vec!["a.rs", "b.rs", "c.rs"]);
assert_eq!(outputs.len(), 3);
}
other => panic!("expected ForEach output, got {other:?}"),
}
assert_eq!(
res.final_state.get("foreach.fan.count"),
Some(&Value::from(3usize))
);
assert_eq!(
res.final_state.get("foreach.fan.0.item"),
Some(&Value::String("a.rs".into()))
);
assert_eq!(
res.final_state.get("foreach.fan.0.state.seen_0"),
Some(&Value::String("a.rs".into()))
);
assert_eq!(
res.final_state.get("foreach.fan.2.state.seen_2"),
Some(&Value::String("c.rs".into()))
);
}
#[tokio::test]
async fn for_each_missing_key_is_noop() {
let step = StageStep::ForEach(ForEachStep {
items_from: "nonexistent".into(),
body: Box::new(empty_proposal_stage("b").step),
max_concurrent: 0,
});
let wf = single_stage_wf("fan", step);
let res = test_engine().run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
match &res.stages[0].output {
StageOutput::ForEach { items, outputs } => {
assert!(items.is_empty());
assert!(outputs.is_empty());
}
other => panic!("expected ForEach output, got {other:?}"),
}
}
struct CapturingRunner {
last_task: std::sync::Arc<std::sync::Mutex<String>>,
}
#[async_trait::async_trait]
impl car_multi::AgentRunner for CapturingRunner {
async fn run(
&self,
spec: &car_multi::AgentSpec,
task: &str,
_runtime: &car_engine::Runtime,
_mailbox: &car_multi::Mailbox,
) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
*self.last_task.lock().unwrap() = task.to_string();
Ok(car_multi::AgentOutput {
name: spec.name.clone(),
answer: "ok".into(),
turns: 1,
tool_calls: 0,
duration_ms: 1.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn goal_is_pinned_and_anchored_into_agent_task() {
let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
last_task: last_task.clone(),
});
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "s".into(),
goal: Some("ship the release safely".into()),
stages: vec![Stage {
id: "s".into(),
name: "s".into(),
step: StageStep::Pattern(PatternStep {
pattern: PatternKind::SwarmParallel,
task: "draft the notes".into(),
agents: vec![car_multi::AgentSpec::new("a", "")],
config: HashMap::new(),
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
}],
edges: vec![],
max_iterations: 100,
metadata: HashMap::new(),
};
let res = eng.run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert_eq!(
res.final_state.get("goal"),
Some(&Value::String("ship the release safely".into()))
);
let seen = last_task.lock().unwrap().clone();
assert!(seen.contains("Overall goal: ship the release safely"), "got: {seen}");
assert!(seen.contains("Current step: draft the notes"), "got: {seen}");
}
struct ReviewRunner;
#[async_trait::async_trait]
impl car_multi::AgentRunner for ReviewRunner {
async fn run(
&self,
spec: &car_multi::AgentSpec,
_task: &str,
_runtime: &car_engine::Runtime,
_mailbox: &car_multi::Mailbox,
) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
let answer = if spec.name.contains("review") {
r#"{"passed": true, "findings": [{"criterion":"complete","passed":true,"evidence":"all there","severity":"info"}]}"#.to_string()
} else {
spec.name.clone()
};
Ok(car_multi::AgentOutput {
name: spec.name.clone(),
answer,
turns: 1,
tool_calls: 0,
duration_ms: 1.0,
error: None,
outcome: None,
tokens: None,
})
}
}
#[tokio::test]
async fn adversarial_review_stage_gates_prior_work() {
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let seed = Stage {
id: "produce".into(),
name: "produce".into(),
step: proposal_step_writing("draft", Value::String("the finished work".into())),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let review = Stage {
id: "review".into(),
name: "review".into(),
step: StageStep::Pattern(PatternStep {
pattern: PatternKind::AdversarialReview,
task: "verify".into(),
agents: vec![car_multi::AgentSpec::new("reviewer", "be strict")],
config: [
(
"criteria".to_string(),
serde_json::json!(["work is complete"]),
),
("review_key".to_string(), Value::String("draft".into())),
]
.into(),
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "produce".into(),
goal: None,
stages: vec![seed, review],
edges: vec![edge("produce", "review", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let res = eng.run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert_eq!(
res.final_state.get("stage.review.review_passed"),
Some(&Value::Bool(true))
);
assert!(matches!(
res.stages[1].output,
StageOutput::Review { passed: true, .. }
));
}
#[tokio::test]
async fn adversarial_review_fails_closed_on_missing_work() {
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let wf = single_stage_wf(
"review",
StageStep::Pattern(PatternStep {
pattern: PatternKind::AdversarialReview,
task: "verify".into(),
agents: vec![car_multi::AgentSpec::new("reviewer", "")],
config: [("review_key".to_string(), Value::String("nothing".into()))].into(),
}),
);
let res = eng.run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert_eq!(
res.final_state.get("stage.review.review_passed"),
Some(&Value::Bool(false)),
"empty work must fail closed, not vacuously pass"
);
}
#[tokio::test]
async fn stage_delta_cannot_clobber_pinned_goal() {
let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
last_task: last_task.clone(),
});
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let hijack = Stage {
id: "hijack".into(),
name: "hijack".into(),
step: proposal_step_writing("goal", Value::String("DO SOMETHING ELSE".into())),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let work = Stage {
id: "work".into(),
name: "work".into(),
step: StageStep::Pattern(PatternStep {
pattern: PatternKind::SwarmParallel,
task: "do the thing".into(),
agents: vec![car_multi::AgentSpec::new("a", "")],
config: HashMap::new(),
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "hijack".into(),
goal: Some("the real goal".into()),
stages: vec![hijack, work],
edges: vec![edge("hijack", "work", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let res = eng.run(&wf).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert_eq!(
res.final_state.get("goal"),
Some(&Value::String("the real goal".into()))
);
let seen = last_task.lock().unwrap().clone();
assert!(seen.contains("Overall goal: the real goal"), "got: {seen}");
}
#[tokio::test]
async fn run_cached_replays_prefix_and_runs_rest_live() {
let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
last_task: last_task.clone(),
});
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let a = Stage {
id: "a".into(),
name: "a".into(),
step: proposal_step_writing("k", Value::String("v".into())),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let b = Stage {
id: "b".into(),
name: "b".into(),
step: StageStep::Pattern(PatternStep {
pattern: PatternKind::SwarmParallel,
task: "do b".into(),
agents: vec![car_multi::AgentSpec::new("agent", "")],
config: HashMap::new(),
}),
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
};
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "a".into(),
goal: None,
stages: vec![a, b],
edges: vec![edge("a", "b", vec![])],
max_iterations: 100,
metadata: HashMap::new(),
};
let mut final_state = HashMap::new();
final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
final_state.insert("k".to_string(), Value::String("v".into()));
let prior = WorkflowResult {
workflow_id: "wf".into(),
workflow_name: "WF".into(),
status: WorkflowStatus::Failed,
stages: vec![StageResult {
stage_id: "a".into(),
stage_name: "a".into(),
status: StageStatus::Succeeded,
output: StageOutput::Empty,
duration_ms: 1.0,
error: None,
}],
compensations: vec![],
duration_ms: 1.0,
timestamp: chrono::Utc::now(),
final_state,
paused: None,
};
let res = eng.run_cached(&wf, &prior).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
assert_eq!(ran, vec!["a", "b"]);
assert!(
!last_task.lock().unwrap().is_empty(),
"stage B should have executed live"
);
assert!(matches!(res.stages[0].output, StageOutput::Empty));
}
#[tokio::test]
async fn run_cached_strips_stale_uncached_stage_bookkeeping() {
let eng = test_engine();
let wf = Workflow {
id: "wf".into(),
name: "WF".into(),
start: "a".into(),
goal: None,
stages: vec![empty_proposal_stage("a"), empty_proposal_stage("ghost")],
edges: vec![],
max_iterations: 100,
metadata: HashMap::new(),
};
let mut final_state = HashMap::new();
final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
final_state.insert("stage.ghost.error".to_string(), Value::String("stale".into()));
final_state.insert("real_data".to_string(), Value::from(42));
let prior = WorkflowResult {
workflow_id: "wf".into(),
workflow_name: "WF".into(),
status: WorkflowStatus::Failed,
stages: vec![StageResult {
stage_id: "a".into(),
stage_name: "a".into(),
status: StageStatus::Succeeded,
output: StageOutput::Empty,
duration_ms: 1.0,
error: None,
}],
compensations: vec![],
duration_ms: 1.0,
timestamp: chrono::Utc::now(),
final_state,
paused: None,
};
let res = eng.run_cached(&wf, &prior).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert!(!res.final_state.contains_key("stage.ghost.error"));
assert_eq!(res.final_state.get("real_data"), Some(&Value::from(42)));
assert_eq!(
res.final_state.get("stage.a.succeeded"),
Some(&Value::Bool(true))
);
}
#[tokio::test]
async fn run_cached_with_full_cache_executes_nothing() {
let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
last_task: last_task.clone(),
});
let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
let wf = single_stage_wf(
"only",
StageStep::Pattern(PatternStep {
pattern: PatternKind::SwarmParallel,
task: "x".into(),
agents: vec![car_multi::AgentSpec::new("agent", "")],
config: HashMap::new(),
}),
);
let prior = WorkflowResult {
workflow_id: "wf".into(),
workflow_name: "WF".into(),
status: WorkflowStatus::Completed,
stages: vec![StageResult {
stage_id: "only".into(),
stage_name: "only".into(),
status: StageStatus::Succeeded,
output: StageOutput::Empty,
duration_ms: 1.0,
error: None,
}],
compensations: vec![],
duration_ms: 1.0,
timestamp: chrono::Utc::now(),
final_state: HashMap::new(),
paused: None,
};
let res = eng.run_cached(&wf, &prior).await.unwrap();
assert_eq!(res.status, WorkflowStatus::Completed);
assert!(
last_task.lock().unwrap().is_empty(),
"fully-cached run must not execute any agent"
);
}
}