use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use serde_json::Value;
use tracing::{debug, info, warn};
use car_multi::{
AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm, SwarmMode,
Vote,
};
use crate::error::WorkflowError;
use crate::result::*;
use crate::types::*;
pub struct WorkflowEngine {
runner: Arc<dyn AgentRunner>,
infra: SharedInfra,
}
impl WorkflowEngine {
pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
Self { runner, infra }
}
pub fn run<'a>(&'a self, workflow: &'a Workflow) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
Box::pin(self.run_inner(workflow))
}
async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
let start = Instant::now();
if workflow.stage(&workflow.start).is_none() {
return Err(WorkflowError::NoStartStage);
}
let mut wf_state: HashMap<String, Value> = HashMap::new();
let mut stage_results: Vec<StageResult> = Vec::new();
let mut completed_stage_ids: Vec<String> = Vec::new();
let mut iterations: u32 = 0;
let mut current_id = workflow.start.clone();
loop {
iterations += 1;
if iterations > workflow.max_iterations {
return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
}
let stage = workflow.stage(¤t_id)
.ok_or_else(|| WorkflowError::StageNotFound(current_id.clone()))?;
debug!(stage = %stage.id, name = %stage.name, iteration = iterations, "executing stage");
let stage_start = Instant::now();
let result = if let Some(timeout_ms) = stage.timeout_ms {
match tokio::time::timeout(
std::time::Duration::from_millis(timeout_ms),
self.execute_step(&stage.step, &wf_state),
).await {
Ok(r) => r,
Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
}
} else {
self.execute_step(&stage.step, &wf_state).await
};
let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
match result {
Ok((output, answer)) => {
wf_state.insert(
format!("stage.{}.succeeded", stage.id),
Value::Bool(true),
);
wf_state.insert(
format!("stage.{}.answer", stage.id),
Value::String(answer),
);
if let StageOutput::Proposal { ref result } = output {
for ar in &result.results {
for (k, v) in &ar.state_changes {
wf_state.insert(k.clone(), v.clone());
}
}
}
stage_results.push(StageResult {
stage_id: stage.id.clone(),
stage_name: stage.name.clone(),
status: StageStatus::Succeeded,
output,
duration_ms: stage_duration,
error: None,
});
completed_stage_ids.push(stage.id.clone());
info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
}
Err(e) => {
let error_msg = e.to_string();
wf_state.insert(
format!("stage.{}.succeeded", stage.id),
Value::Bool(false),
);
wf_state.insert(
format!("stage.{}.error", stage.id),
Value::String(error_msg.clone()),
);
stage_results.push(StageResult {
stage_id: stage.id.clone(),
stage_name: stage.name.clone(),
status: StageStatus::Failed,
output: StageOutput::Empty,
duration_ms: stage_duration,
error: Some(error_msg.clone()),
});
warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
let compensations = self
.compensate(workflow, &completed_stage_ids)
.await;
let all_compensated = compensations.iter().all(|c| c.status == StageStatus::Succeeded);
let any_compensated = !compensations.is_empty();
let status = if any_compensated && all_compensated {
WorkflowStatus::Compensated
} else if any_compensated {
WorkflowStatus::PartiallyCompensated
} else {
WorkflowStatus::Failed
};
return Ok(WorkflowResult {
workflow_id: workflow.id.clone(),
workflow_name: workflow.name.clone(),
status,
stages: stage_results,
compensations,
duration_ms: start.elapsed().as_secs_f64() * 1000.0,
timestamp: chrono::Utc::now(),
final_state: wf_state,
});
}
}
let edges = workflow.outgoing_edges(¤t_id);
let next = edges.iter().find(|e| check_conditions(&e.conditions, &wf_state));
match next {
Some(edge) => {
debug!(from = %current_id, to = %edge.to, label = %edge.label, "taking edge");
current_id = edge.to.clone();
}
None => {
info!(
workflow = %workflow.name,
stages_executed = stage_results.len(),
"workflow completed"
);
return Ok(WorkflowResult {
workflow_id: workflow.id.clone(),
workflow_name: workflow.name.clone(),
status: WorkflowStatus::Completed,
stages: stage_results,
compensations: vec![],
duration_ms: start.elapsed().as_secs_f64() * 1000.0,
timestamp: chrono::Utc::now(),
final_state: wf_state,
});
}
}
}
}
async fn execute_step(
&self,
step: &StageStep,
_wf_state: &HashMap<String, Value>,
) -> Result<(StageOutput, String), WorkflowError> {
match step {
StageStep::Pattern(ps) => self.execute_pattern(ps).await,
StageStep::Proposal(ps) => self.execute_proposal(ps).await,
StageStep::SubWorkflow(sw) => self.execute_sub_workflow(sw).await,
}
}
async fn execute_pattern(
&self,
step: &PatternStep,
) -> Result<(StageOutput, String), WorkflowError> {
let task = &step.task;
let runner = &self.runner;
let infra = &self.infra;
match step.pattern {
PatternKind::SwarmParallel => {
let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
swarm = swarm.with_synthesizer(synth);
}
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
r.final_summary,
))
}
PatternKind::SwarmSequential => {
let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
r.final_summary,
))
}
PatternKind::SwarmDebate => {
let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
let r = swarm.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
r.final_summary,
))
}
PatternKind::Pipeline => {
let pipeline = Pipeline::new(step.agents.clone());
let r = pipeline.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.stages, final_answer: r.final_answer.clone() },
r.final_answer,
))
}
PatternKind::Supervisor => {
let max_rounds = step.config.get("max_rounds")
.and_then(|v| v.as_u64())
.unwrap_or(3) as u32;
let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
let r = Supervisor::new(workers, supervisor)
.with_max_rounds(max_rounds)
.run(task, runner, infra)
.await?;
let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
Ok((
StageOutput::Pattern { outputs: all_outputs, final_answer: r.final_answer.clone() },
r.final_answer,
))
}
PatternKind::Delegator => {
let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
let delegator = car_multi::Delegator::new(main_agent, specialists);
let r = delegator.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern {
outputs: vec![car_multi::AgentOutput {
name: "delegator".into(),
answer: r.final_answer.clone(),
turns: 0,
tool_calls: r.delegations.len() as u32,
duration_ms: 0.0,
error: None,
outcome: None,
tokens: None,
}],
final_answer: r.final_answer.clone(),
},
r.final_answer,
))
}
PatternKind::MapReduce => {
let max_concurrent = step.config.get("max_concurrent")
.and_then(|v| v.as_u64())
.unwrap_or(5) as usize;
let items: Vec<String> = step.config.get("items")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
if step.agents.len() < 2 {
return Err(WorkflowError::StageFailed(
"map_reduce".into(),
"requires at least 2 agents (mapper + reducer)".into(),
));
}
let mapper = step.agents[0].clone();
let reducer = step.agents[1].clone();
let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
let r = mr.run(task, &items.iter().map(|s| s.as_str()).collect::<Vec<_>>()
.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.map_outputs, final_answer: r.reduced_answer.clone() },
r.reduced_answer,
))
}
PatternKind::Vote => {
let mut vote = Vote::new(step.agents.clone());
if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
vote = vote.with_synthesizer(synth);
}
let r = vote.run(task, runner, infra).await?;
Ok((
StageOutput::Pattern { outputs: r.votes, final_answer: r.winner.clone() },
r.winner,
))
}
PatternKind::Fleet => {
let mut fleet = Fleet::new(step.agents.clone());
if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
fleet = fleet.with_timeout(timeout);
}
let r = fleet.run(runner, infra).await?;
let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
Ok((
StageOutput::Pattern { outputs: r.outputs, final_answer: summary.clone() },
summary,
))
}
}
}
async fn execute_proposal(
&self,
step: &ProposalStep,
) -> Result<(StageOutput, String), WorkflowError> {
let runtime = self.infra.make_runtime();
let result = runtime.execute(&step.proposal).await;
if result.all_succeeded() {
let answer = result.results.last()
.and_then(|r| r.output.as_ref())
.map(|v| v.to_string())
.unwrap_or_default();
Ok((StageOutput::Proposal { result }, answer))
} else {
let errors: Vec<String> = result.results.iter()
.filter_map(|r| r.error.as_ref())
.cloned()
.collect();
Err(WorkflowError::StageFailed(
"proposal".into(),
errors.join("; "),
))
}
}
async fn execute_sub_workflow(
&self,
step: &SubWorkflowStep,
) -> Result<(StageOutput, String), WorkflowError> {
let result = self.run(&step.workflow).await?;
let answer = result.stages.last()
.and_then(|s| match &s.output {
StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
StageOutput::Proposal { result } => result.results.last()
.and_then(|r| r.output.as_ref())
.map(|v| v.to_string()),
StageOutput::SubWorkflow { result } => {
Some(format!("sub-workflow {} {}", result.workflow_name,
if result.succeeded() { "completed" } else { "failed" }))
}
StageOutput::Empty => None,
})
.unwrap_or_default();
if result.succeeded() {
Ok((StageOutput::SubWorkflow { result: Box::new(result) }, answer))
} else {
Err(WorkflowError::StageFailed(
"sub_workflow".into(),
"sub-workflow failed".into(),
))
}
}
async fn compensate(
&self,
workflow: &Workflow,
completed_stage_ids: &[String],
) -> Vec<CompensationResult> {
let mut results = Vec::new();
for stage_id in completed_stage_ids.iter().rev() {
let stage = match workflow.stage(stage_id) {
Some(s) => s,
None => continue,
};
let handler = match &stage.compensation {
Some(h) => h,
None => continue,
};
debug!(stage = %stage_id, "running compensation");
let comp_start = Instant::now();
let comp_result = match handler {
CompensationHandler::Proposal(ps) => {
self.execute_proposal(ps).await
}
CompensationHandler::StageRef { stage_id: ref_id } => {
if let Some(ref_stage) = workflow.stage(ref_id) {
self.execute_step(&ref_stage.step, &HashMap::new()).await
} else {
Err(WorkflowError::StageNotFound(ref_id.clone()))
}
}
};
let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
match comp_result {
Ok(_) => {
results.push(CompensationResult {
for_stage_id: stage_id.clone(),
status: StageStatus::Succeeded,
duration_ms: duration,
error: None,
});
}
Err(e) => {
warn!(stage = %stage_id, error = %e, "compensation failed");
results.push(CompensationResult {
for_stage_id: stage_id.clone(),
status: StageStatus::Failed,
duration_ms: duration,
error: Some(e.to_string()),
});
}
}
}
results
}
}
fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
conditions.iter().all(|cond| evaluate_precondition(cond, state))
}
fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
let op = cond.operator.as_str();
match op {
"exists" => state.contains_key(&cond.key),
"not_exists" => !state.contains_key(&cond.key),
_ => {
let actual = match state.get(&cond.key) {
Some(v) => v,
None => return false, };
match op {
"eq" => actual == &cond.value,
"neq" => actual != &cond.value,
"gt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Greater),
"gte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Less),
"lt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Less),
"lte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Greater),
"contains" => {
if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
haystack.contains(needle)
} else {
false
}
}
_ => false,
}
}
}
}
fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
match (a.as_f64(), b.as_f64()) {
(Some(a), Some(b)) => a.partial_cmp(&b),
_ => match (a.as_str(), b.as_str()) {
(Some(a), Some(b)) => Some(a.cmp(b)),
_ => None,
},
}
}
fn extract_synthesizer(
config: &HashMap<String, Value>,
agents: &[car_multi::AgentSpec],
) -> Option<car_multi::AgentSpec> {
config.get("synthesizer_index")
.and_then(|v| v.as_u64())
.and_then(|i| agents.get(i as usize))
.cloned()
}
fn split_supervisor_workers(
agents: &[car_multi::AgentSpec],
config: &HashMap<String, Value>,
) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
let idx = config.get("supervisor_index")
.and_then(|v| v.as_u64())
.unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
let supervisor = agents.get(idx).cloned().unwrap_or_else(|| agents.last().unwrap().clone());
let workers: Vec<_> = agents.iter().enumerate()
.filter(|(i, _)| *i != idx)
.map(|(_, a)| a.clone())
.collect();
(supervisor, workers)
}
fn split_delegator(
agents: &[car_multi::AgentSpec],
_config: &HashMap<String, Value>,
) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
let main = agents.first().cloned().unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
let specialists: HashMap<String, car_multi::AgentSpec> = agents.iter()
.skip(1)
.map(|a| (a.name.clone(), a.clone()))
.collect();
(main, specialists)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn precondition_eq() {
let mut state = HashMap::new();
state.insert("x".into(), Value::Bool(true));
let cond = car_ir::Precondition {
key: "x".into(),
operator: "eq".into(),
value: Value::Bool(true),
description: String::new(),
};
assert!(evaluate_precondition(&cond, &state));
let cond_false = car_ir::Precondition {
key: "x".into(),
operator: "eq".into(),
value: Value::Bool(false),
description: String::new(),
};
assert!(!evaluate_precondition(&cond_false, &state));
}
#[test]
fn precondition_exists() {
let mut state = HashMap::new();
state.insert("x".into(), Value::Null);
let exists = car_ir::Precondition {
key: "x".into(),
operator: "exists".into(),
value: Value::Null,
description: String::new(),
};
assert!(evaluate_precondition(&exists, &state));
let not_exists = car_ir::Precondition {
key: "y".into(),
operator: "exists".into(),
value: Value::Null,
description: String::new(),
};
assert!(!evaluate_precondition(¬_exists, &state));
}
#[test]
fn precondition_numeric_comparison() {
let mut state = HashMap::new();
state.insert("count".into(), serde_json::json!(5));
let gt = car_ir::Precondition {
key: "count".into(),
operator: "gt".into(),
value: serde_json::json!(3),
description: String::new(),
};
assert!(evaluate_precondition(>, &state));
let lt = car_ir::Precondition {
key: "count".into(),
operator: "lt".into(),
value: serde_json::json!(3),
description: String::new(),
};
assert!(!evaluate_precondition(<, &state));
}
#[test]
fn empty_conditions_always_pass() {
let state = HashMap::new();
assert!(check_conditions(&[], &state));
}
}