1use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12 AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm, SwarmMode,
13 Vote,
14};
15
16use crate::error::WorkflowError;
17use crate::result::*;
18use crate::types::*;
19
20pub struct WorkflowEngine {
22 runner: Arc<dyn AgentRunner>,
23 infra: SharedInfra,
24}
25
26impl WorkflowEngine {
27 pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
28 Self { runner, infra }
29 }
30
31 pub fn run<'a>(&'a self, workflow: &'a Workflow) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
33 Box::pin(self.run_inner(workflow))
34 }
35
36 async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
37 let start = Instant::now();
38
39 if workflow.stage(&workflow.start).is_none() {
41 return Err(WorkflowError::NoStartStage);
42 }
43
44 let mut wf_state: HashMap<String, Value> = HashMap::new();
46 let mut stage_results: Vec<StageResult> = Vec::new();
47 let mut completed_stage_ids: Vec<String> = Vec::new();
48 let mut iterations: u32 = 0;
49
50 let mut current_id = workflow.start.clone();
51
52 loop {
53 iterations += 1;
54 if iterations > workflow.max_iterations {
55 return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
56 }
57
58 let stage = workflow.stage(¤t_id)
59 .ok_or_else(|| WorkflowError::StageNotFound(current_id.clone()))?;
60
61 debug!(stage = %stage.id, name = %stage.name, iteration = iterations, "executing stage");
62
63 let stage_start = Instant::now();
65 let result = if let Some(timeout_ms) = stage.timeout_ms {
66 match tokio::time::timeout(
67 std::time::Duration::from_millis(timeout_ms),
68 self.execute_step(&stage.step, &wf_state),
69 ).await {
70 Ok(r) => r,
71 Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
72 }
73 } else {
74 self.execute_step(&stage.step, &wf_state).await
75 };
76 let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
77
78 match result {
79 Ok((output, answer)) => {
80 wf_state.insert(
82 format!("stage.{}.succeeded", stage.id),
83 Value::Bool(true),
84 );
85 wf_state.insert(
86 format!("stage.{}.answer", stage.id),
87 Value::String(answer),
88 );
89
90 if let StageOutput::Proposal { ref result } = output {
92 for ar in &result.results {
93 for (k, v) in &ar.state_changes {
94 wf_state.insert(k.clone(), v.clone());
95 }
96 }
97 }
98
99 stage_results.push(StageResult {
100 stage_id: stage.id.clone(),
101 stage_name: stage.name.clone(),
102 status: StageStatus::Succeeded,
103 output,
104 duration_ms: stage_duration,
105 error: None,
106 });
107 completed_stage_ids.push(stage.id.clone());
108
109 info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
110 }
111 Err(e) => {
112 let error_msg = e.to_string();
113 wf_state.insert(
114 format!("stage.{}.succeeded", stage.id),
115 Value::Bool(false),
116 );
117 wf_state.insert(
118 format!("stage.{}.error", stage.id),
119 Value::String(error_msg.clone()),
120 );
121
122 stage_results.push(StageResult {
123 stage_id: stage.id.clone(),
124 stage_name: stage.name.clone(),
125 status: StageStatus::Failed,
126 output: StageOutput::Empty,
127 duration_ms: stage_duration,
128 error: Some(error_msg.clone()),
129 });
130
131 warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
132
133 let compensations = self
135 .compensate(workflow, &completed_stage_ids)
136 .await;
137
138 let all_compensated = compensations.iter().all(|c| c.status == StageStatus::Succeeded);
139 let any_compensated = !compensations.is_empty();
140
141 let status = if any_compensated && all_compensated {
142 WorkflowStatus::Compensated
143 } else if any_compensated {
144 WorkflowStatus::PartiallyCompensated
145 } else {
146 WorkflowStatus::Failed
147 };
148
149 return Ok(WorkflowResult {
150 workflow_id: workflow.id.clone(),
151 workflow_name: workflow.name.clone(),
152 status,
153 stages: stage_results,
154 compensations,
155 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
156 timestamp: chrono::Utc::now(),
157 final_state: wf_state,
158 });
159 }
160 }
161
162 let edges = workflow.outgoing_edges(¤t_id);
164 let next = edges.iter().find(|e| check_conditions(&e.conditions, &wf_state));
165
166 match next {
167 Some(edge) => {
168 debug!(from = %current_id, to = %edge.to, label = %edge.label, "taking edge");
169 current_id = edge.to.clone();
170 }
171 None => {
172 info!(
174 workflow = %workflow.name,
175 stages_executed = stage_results.len(),
176 "workflow completed"
177 );
178 return Ok(WorkflowResult {
179 workflow_id: workflow.id.clone(),
180 workflow_name: workflow.name.clone(),
181 status: WorkflowStatus::Completed,
182 stages: stage_results,
183 compensations: vec![],
184 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
185 timestamp: chrono::Utc::now(),
186 final_state: wf_state,
187 });
188 }
189 }
190 }
191 }
192
193 async fn execute_step(
195 &self,
196 step: &StageStep,
197 _wf_state: &HashMap<String, Value>,
198 ) -> Result<(StageOutput, String), WorkflowError> {
199 match step {
200 StageStep::Pattern(ps) => self.execute_pattern(ps).await,
201 StageStep::Proposal(ps) => self.execute_proposal(ps).await,
202 StageStep::SubWorkflow(sw) => self.execute_sub_workflow(sw).await,
203 }
204 }
205
206 async fn execute_pattern(
208 &self,
209 step: &PatternStep,
210 ) -> Result<(StageOutput, String), WorkflowError> {
211 let task = &step.task;
212 let runner = &self.runner;
213 let infra = &self.infra;
214
215 match step.pattern {
216 PatternKind::SwarmParallel => {
217 let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
218 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
219 swarm = swarm.with_synthesizer(synth);
220 }
221 let r = swarm.run(task, runner, infra).await?;
222 Ok((
223 StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
224 r.final_summary,
225 ))
226 }
227 PatternKind::SwarmSequential => {
228 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
229 let r = swarm.run(task, runner, infra).await?;
230 Ok((
231 StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
232 r.final_summary,
233 ))
234 }
235 PatternKind::SwarmDebate => {
236 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
237 let r = swarm.run(task, runner, infra).await?;
238 Ok((
239 StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
240 r.final_summary,
241 ))
242 }
243 PatternKind::Pipeline => {
244 let pipeline = Pipeline::new(step.agents.clone());
245 let r = pipeline.run(task, runner, infra).await?;
246 Ok((
247 StageOutput::Pattern { outputs: r.stages, final_answer: r.final_answer.clone() },
248 r.final_answer,
249 ))
250 }
251 PatternKind::Supervisor => {
252 let max_rounds = step.config.get("max_rounds")
253 .and_then(|v| v.as_u64())
254 .unwrap_or(3) as u32;
255 let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
256 let r = Supervisor::new(workers, supervisor)
257 .with_max_rounds(max_rounds)
258 .run(task, runner, infra)
259 .await?;
260 let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
261 Ok((
262 StageOutput::Pattern { outputs: all_outputs, final_answer: r.final_answer.clone() },
263 r.final_answer,
264 ))
265 }
266 PatternKind::Delegator => {
267 let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
268 let delegator = car_multi::Delegator::new(main_agent, specialists);
269 let r = delegator.run(task, runner, infra).await?;
270 Ok((
271 StageOutput::Pattern {
272 outputs: vec![car_multi::AgentOutput {
273 name: "delegator".into(),
274 answer: r.final_answer.clone(),
275 turns: 0,
276 tool_calls: r.delegations.len() as u32,
277 duration_ms: 0.0,
278 error: None,
279 outcome: None,
280 tokens: None,
281 }],
282 final_answer: r.final_answer.clone(),
283 },
284 r.final_answer,
285 ))
286 }
287 PatternKind::MapReduce => {
288 let max_concurrent = step.config.get("max_concurrent")
289 .and_then(|v| v.as_u64())
290 .unwrap_or(5) as usize;
291 let items: Vec<String> = step.config.get("items")
292 .and_then(|v| serde_json::from_value(v.clone()).ok())
293 .unwrap_or_default();
294
295 if step.agents.len() < 2 {
296 return Err(WorkflowError::StageFailed(
297 "map_reduce".into(),
298 "requires at least 2 agents (mapper + reducer)".into(),
299 ));
300 }
301 let mapper = step.agents[0].clone();
302 let reducer = step.agents[1].clone();
303
304 let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
305 let r = mr.run(task, &items.iter().map(|s| s.as_str()).collect::<Vec<_>>()
306 .iter().map(|s| s.to_string()).collect::<Vec<_>>(),
307 runner, infra).await?;
308 Ok((
309 StageOutput::Pattern { outputs: r.map_outputs, final_answer: r.reduced_answer.clone() },
310 r.reduced_answer,
311 ))
312 }
313 PatternKind::Vote => {
314 let mut vote = Vote::new(step.agents.clone());
315 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
316 vote = vote.with_synthesizer(synth);
317 }
318 let r = vote.run(task, runner, infra).await?;
319 Ok((
320 StageOutput::Pattern { outputs: r.votes, final_answer: r.winner.clone() },
321 r.winner,
322 ))
323 }
324 PatternKind::Fleet => {
325 let mut fleet = Fleet::new(step.agents.clone());
326 if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
327 fleet = fleet.with_timeout(timeout);
328 }
329 let r = fleet.run(runner, infra).await?;
330 let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
331 Ok((
332 StageOutput::Pattern { outputs: r.outputs, final_answer: summary.clone() },
333 summary,
334 ))
335 }
336 }
337 }
338
339 async fn execute_proposal(
341 &self,
342 step: &ProposalStep,
343 ) -> Result<(StageOutput, String), WorkflowError> {
344 let runtime = self.infra.make_runtime();
345 let result = runtime.execute(&step.proposal).await;
346
347 if result.all_succeeded() {
348 let answer = result.results.last()
349 .and_then(|r| r.output.as_ref())
350 .map(|v| v.to_string())
351 .unwrap_or_default();
352 Ok((StageOutput::Proposal { result }, answer))
353 } else {
354 let errors: Vec<String> = result.results.iter()
355 .filter_map(|r| r.error.as_ref())
356 .cloned()
357 .collect();
358 Err(WorkflowError::StageFailed(
359 "proposal".into(),
360 errors.join("; "),
361 ))
362 }
363 }
364
365 async fn execute_sub_workflow(
367 &self,
368 step: &SubWorkflowStep,
369 ) -> Result<(StageOutput, String), WorkflowError> {
370 let result = self.run(&step.workflow).await?;
371 let answer = result.stages.last()
372 .and_then(|s| match &s.output {
373 StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
374 StageOutput::Proposal { result } => result.results.last()
375 .and_then(|r| r.output.as_ref())
376 .map(|v| v.to_string()),
377 StageOutput::SubWorkflow { result } => {
378 Some(format!("sub-workflow {} {}", result.workflow_name,
379 if result.succeeded() { "completed" } else { "failed" }))
380 }
381 StageOutput::Empty => None,
382 })
383 .unwrap_or_default();
384
385 if result.succeeded() {
386 Ok((StageOutput::SubWorkflow { result: Box::new(result) }, answer))
387 } else {
388 Err(WorkflowError::StageFailed(
389 "sub_workflow".into(),
390 "sub-workflow failed".into(),
391 ))
392 }
393 }
394
395 async fn compensate(
397 &self,
398 workflow: &Workflow,
399 completed_stage_ids: &[String],
400 ) -> Vec<CompensationResult> {
401 let mut results = Vec::new();
402
403 for stage_id in completed_stage_ids.iter().rev() {
404 let stage = match workflow.stage(stage_id) {
405 Some(s) => s,
406 None => continue,
407 };
408
409 let handler = match &stage.compensation {
410 Some(h) => h,
411 None => continue,
412 };
413
414 debug!(stage = %stage_id, "running compensation");
415 let comp_start = Instant::now();
416
417 let comp_result = match handler {
418 CompensationHandler::Proposal(ps) => {
419 self.execute_proposal(ps).await
420 }
421 CompensationHandler::StageRef { stage_id: ref_id } => {
422 if let Some(ref_stage) = workflow.stage(ref_id) {
423 self.execute_step(&ref_stage.step, &HashMap::new()).await
424 } else {
425 Err(WorkflowError::StageNotFound(ref_id.clone()))
426 }
427 }
428 };
429
430 let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
431
432 match comp_result {
433 Ok(_) => {
434 results.push(CompensationResult {
435 for_stage_id: stage_id.clone(),
436 status: StageStatus::Succeeded,
437 duration_ms: duration,
438 error: None,
439 });
440 }
441 Err(e) => {
442 warn!(stage = %stage_id, error = %e, "compensation failed");
443 results.push(CompensationResult {
444 for_stage_id: stage_id.clone(),
445 status: StageStatus::Failed,
446 duration_ms: duration,
447 error: Some(e.to_string()),
448 });
449 }
450 }
451 }
452
453 results
454 }
455}
456
457fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
461 conditions.iter().all(|cond| evaluate_precondition(cond, state))
462}
463
464fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
466 let op = cond.operator.as_str();
467
468 match op {
469 "exists" => state.contains_key(&cond.key),
470 "not_exists" => !state.contains_key(&cond.key),
471 _ => {
472 let actual = match state.get(&cond.key) {
473 Some(v) => v,
474 None => return false, };
476 match op {
477 "eq" => actual == &cond.value,
478 "neq" => actual != &cond.value,
479 "gt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Greater),
480 "gte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Less),
481 "lt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Less),
482 "lte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Greater),
483 "contains" => {
484 if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
485 haystack.contains(needle)
486 } else {
487 false
488 }
489 }
490 _ => false,
491 }
492 }
493 }
494}
495
496fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
497 match (a.as_f64(), b.as_f64()) {
498 (Some(a), Some(b)) => a.partial_cmp(&b),
499 _ => match (a.as_str(), b.as_str()) {
500 (Some(a), Some(b)) => Some(a.cmp(b)),
501 _ => None,
502 },
503 }
504}
505
506fn extract_synthesizer(
510 config: &HashMap<String, Value>,
511 agents: &[car_multi::AgentSpec],
512) -> Option<car_multi::AgentSpec> {
513 config.get("synthesizer_index")
514 .and_then(|v| v.as_u64())
515 .and_then(|i| agents.get(i as usize))
516 .cloned()
517}
518
519fn split_supervisor_workers(
521 agents: &[car_multi::AgentSpec],
522 config: &HashMap<String, Value>,
523) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
524 let idx = config.get("supervisor_index")
525 .and_then(|v| v.as_u64())
526 .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
527
528 let supervisor = agents.get(idx).cloned().unwrap_or_else(|| agents.last().unwrap().clone());
529 let workers: Vec<_> = agents.iter().enumerate()
530 .filter(|(i, _)| *i != idx)
531 .map(|(_, a)| a.clone())
532 .collect();
533 (supervisor, workers)
534}
535
536fn split_delegator(
538 agents: &[car_multi::AgentSpec],
539 _config: &HashMap<String, Value>,
540) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
541 let main = agents.first().cloned().unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
542 let specialists: HashMap<String, car_multi::AgentSpec> = agents.iter()
543 .skip(1)
544 .map(|a| (a.name.clone(), a.clone()))
545 .collect();
546 (main, specialists)
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552
553 #[test]
554 fn precondition_eq() {
555 let mut state = HashMap::new();
556 state.insert("x".into(), Value::Bool(true));
557
558 let cond = car_ir::Precondition {
559 key: "x".into(),
560 operator: "eq".into(),
561 value: Value::Bool(true),
562 description: String::new(),
563 };
564 assert!(evaluate_precondition(&cond, &state));
565
566 let cond_false = car_ir::Precondition {
567 key: "x".into(),
568 operator: "eq".into(),
569 value: Value::Bool(false),
570 description: String::new(),
571 };
572 assert!(!evaluate_precondition(&cond_false, &state));
573 }
574
575 #[test]
576 fn precondition_exists() {
577 let mut state = HashMap::new();
578 state.insert("x".into(), Value::Null);
579
580 let exists = car_ir::Precondition {
581 key: "x".into(),
582 operator: "exists".into(),
583 value: Value::Null,
584 description: String::new(),
585 };
586 assert!(evaluate_precondition(&exists, &state));
587
588 let not_exists = car_ir::Precondition {
589 key: "y".into(),
590 operator: "exists".into(),
591 value: Value::Null,
592 description: String::new(),
593 };
594 assert!(!evaluate_precondition(¬_exists, &state));
595 }
596
597 #[test]
598 fn precondition_numeric_comparison() {
599 let mut state = HashMap::new();
600 state.insert("count".into(), serde_json::json!(5));
601
602 let gt = car_ir::Precondition {
603 key: "count".into(),
604 operator: "gt".into(),
605 value: serde_json::json!(3),
606 description: String::new(),
607 };
608 assert!(evaluate_precondition(>, &state));
609
610 let lt = car_ir::Precondition {
611 key: "count".into(),
612 operator: "lt".into(),
613 value: serde_json::json!(3),
614 description: String::new(),
615 };
616 assert!(!evaluate_precondition(<, &state));
617 }
618
619 #[test]
620 fn empty_conditions_always_pass() {
621 let state = HashMap::new();
622 assert!(check_conditions(&[], &state));
623 }
624}