1use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12 AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm, SwarmMode, Vote,
13};
14
15use crate::error::WorkflowError;
16use crate::result::*;
17use crate::types::*;
18
19pub struct WorkflowEngine {
21 runner: Arc<dyn AgentRunner>,
22 infra: SharedInfra,
23}
24
25impl WorkflowEngine {
26 pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
27 Self { runner, infra }
28 }
29
30 pub fn run<'a>(
32 &'a self,
33 workflow: &'a Workflow,
34 ) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
35 Box::pin(self.run_inner(workflow))
36 }
37
38 async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
39 let start = Instant::now();
40
41 if workflow.stage(&workflow.start).is_none() {
43 return Err(WorkflowError::NoStartStage);
44 }
45
46 let mut wf_state: HashMap<String, Value> = HashMap::new();
48 let mut stage_results: Vec<StageResult> = Vec::new();
49 let mut completed_stage_ids: Vec<String> = Vec::new();
50 let mut iterations: u32 = 0;
51
52 let mut current_id = workflow.start.clone();
53
54 loop {
55 iterations += 1;
56 if iterations > workflow.max_iterations {
57 return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
58 }
59
60 let stage = workflow
61 .stage(¤t_id)
62 .ok_or_else(|| WorkflowError::StageNotFound(current_id.clone()))?;
63
64 debug!(stage = %stage.id, name = %stage.name, iteration = iterations, "executing stage");
65
66 let stage_start = Instant::now();
68 let result = if let Some(timeout_ms) = stage.timeout_ms {
69 match tokio::time::timeout(
70 std::time::Duration::from_millis(timeout_ms),
71 self.execute_step(&stage.step, &wf_state),
72 )
73 .await
74 {
75 Ok(r) => r,
76 Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
77 }
78 } else {
79 self.execute_step(&stage.step, &wf_state).await
80 };
81 let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
82
83 match result {
84 Ok((output, answer)) => {
85 wf_state.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
87 wf_state.insert(format!("stage.{}.answer", stage.id), Value::String(answer));
88
89 if let StageOutput::Proposal { ref result } = output {
91 for ar in &result.results {
92 for (k, v) in &ar.state_changes {
93 wf_state.insert(k.clone(), v.clone());
94 }
95 }
96 }
97
98 stage_results.push(StageResult {
99 stage_id: stage.id.clone(),
100 stage_name: stage.name.clone(),
101 status: StageStatus::Succeeded,
102 output,
103 duration_ms: stage_duration,
104 error: None,
105 });
106 completed_stage_ids.push(stage.id.clone());
107
108 info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
109 }
110 Err(e) => {
111 let error_msg = e.to_string();
112 wf_state.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(false));
113 wf_state.insert(
114 format!("stage.{}.error", stage.id),
115 Value::String(error_msg.clone()),
116 );
117
118 stage_results.push(StageResult {
119 stage_id: stage.id.clone(),
120 stage_name: stage.name.clone(),
121 status: StageStatus::Failed,
122 output: StageOutput::Empty,
123 duration_ms: stage_duration,
124 error: Some(error_msg.clone()),
125 });
126
127 warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
128
129 let compensations = self.compensate(workflow, &completed_stage_ids).await;
131
132 let all_compensated = compensations
133 .iter()
134 .all(|c| c.status == StageStatus::Succeeded);
135 let any_compensated = !compensations.is_empty();
136
137 let status = if any_compensated && all_compensated {
138 WorkflowStatus::Compensated
139 } else if any_compensated {
140 WorkflowStatus::PartiallyCompensated
141 } else {
142 WorkflowStatus::Failed
143 };
144
145 return Ok(WorkflowResult {
146 workflow_id: workflow.id.clone(),
147 workflow_name: workflow.name.clone(),
148 status,
149 stages: stage_results,
150 compensations,
151 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
152 timestamp: chrono::Utc::now(),
153 final_state: wf_state,
154 });
155 }
156 }
157
158 let edges = workflow.outgoing_edges(¤t_id);
160 let next = edges
161 .iter()
162 .find(|e| check_conditions(&e.conditions, &wf_state));
163
164 match next {
165 Some(edge) => {
166 debug!(from = %current_id, to = %edge.to, label = %edge.label, "taking edge");
167 current_id = edge.to.clone();
168 }
169 None => {
170 info!(
172 workflow = %workflow.name,
173 stages_executed = stage_results.len(),
174 "workflow completed"
175 );
176 return Ok(WorkflowResult {
177 workflow_id: workflow.id.clone(),
178 workflow_name: workflow.name.clone(),
179 status: WorkflowStatus::Completed,
180 stages: stage_results,
181 compensations: vec![],
182 duration_ms: start.elapsed().as_secs_f64() * 1000.0,
183 timestamp: chrono::Utc::now(),
184 final_state: wf_state,
185 });
186 }
187 }
188 }
189 }
190
191 async fn execute_step(
193 &self,
194 step: &StageStep,
195 _wf_state: &HashMap<String, Value>,
196 ) -> Result<(StageOutput, String), WorkflowError> {
197 match step {
198 StageStep::Pattern(ps) => self.execute_pattern(ps).await,
199 StageStep::Proposal(ps) => self.execute_proposal(ps).await,
200 StageStep::SubWorkflow(sw) => self.execute_sub_workflow(sw).await,
201 }
202 }
203
204 async fn execute_pattern(
206 &self,
207 step: &PatternStep,
208 ) -> Result<(StageOutput, String), WorkflowError> {
209 let task = &step.task;
210 let runner = &self.runner;
211 let infra = &self.infra;
212
213 match step.pattern {
214 PatternKind::SwarmParallel => {
215 let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
216 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
217 swarm = swarm.with_synthesizer(synth);
218 }
219 let r = swarm.run(task, runner, infra).await?;
220 Ok((
221 StageOutput::Pattern {
222 outputs: r.outputs,
223 final_answer: r.final_summary.clone(),
224 },
225 r.final_summary,
226 ))
227 }
228 PatternKind::SwarmSequential => {
229 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
230 let r = swarm.run(task, runner, infra).await?;
231 Ok((
232 StageOutput::Pattern {
233 outputs: r.outputs,
234 final_answer: r.final_summary.clone(),
235 },
236 r.final_summary,
237 ))
238 }
239 PatternKind::SwarmDebate => {
240 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
241 let r = swarm.run(task, runner, infra).await?;
242 Ok((
243 StageOutput::Pattern {
244 outputs: r.outputs,
245 final_answer: r.final_summary.clone(),
246 },
247 r.final_summary,
248 ))
249 }
250 PatternKind::Pipeline => {
251 let pipeline = Pipeline::new(step.agents.clone());
252 let r = pipeline.run(task, runner, infra).await?;
253 Ok((
254 StageOutput::Pattern {
255 outputs: r.stages,
256 final_answer: r.final_answer.clone(),
257 },
258 r.final_answer,
259 ))
260 }
261 PatternKind::Supervisor => {
262 let max_rounds = step
263 .config
264 .get("max_rounds")
265 .and_then(|v| v.as_u64())
266 .unwrap_or(3) as u32;
267 let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
268 let r = Supervisor::new(workers, supervisor)
269 .with_max_rounds(max_rounds)
270 .run(task, runner, infra)
271 .await?;
272 let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
273 Ok((
274 StageOutput::Pattern {
275 outputs: all_outputs,
276 final_answer: r.final_answer.clone(),
277 },
278 r.final_answer,
279 ))
280 }
281 PatternKind::Delegator => {
282 let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
283 let delegator = car_multi::Delegator::new(main_agent, specialists);
284 let r = delegator.run(task, runner, infra).await?;
285 Ok((
286 StageOutput::Pattern {
287 outputs: vec![car_multi::AgentOutput {
288 name: "delegator".into(),
289 answer: r.final_answer.clone(),
290 turns: 0,
291 tool_calls: r.delegations.len() as u32,
292 duration_ms: 0.0,
293 error: None,
294 outcome: None,
295 tokens: None,
296 }],
297 final_answer: r.final_answer.clone(),
298 },
299 r.final_answer,
300 ))
301 }
302 PatternKind::MapReduce => {
303 let max_concurrent = step
304 .config
305 .get("max_concurrent")
306 .and_then(|v| v.as_u64())
307 .unwrap_or(5) as usize;
308 let items: Vec<String> = step
309 .config
310 .get("items")
311 .and_then(|v| serde_json::from_value(v.clone()).ok())
312 .unwrap_or_default();
313
314 if step.agents.len() < 2 {
315 return Err(WorkflowError::StageFailed(
316 "map_reduce".into(),
317 "requires at least 2 agents (mapper + reducer)".into(),
318 ));
319 }
320 let mapper = step.agents[0].clone();
321 let reducer = step.agents[1].clone();
322
323 let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
324 let r = mr
325 .run(
326 task,
327 &items
328 .iter()
329 .map(|s| s.as_str())
330 .collect::<Vec<_>>()
331 .iter()
332 .map(|s| s.to_string())
333 .collect::<Vec<_>>(),
334 runner,
335 infra,
336 )
337 .await?;
338 Ok((
339 StageOutput::Pattern {
340 outputs: r.map_outputs,
341 final_answer: r.reduced_answer.clone(),
342 },
343 r.reduced_answer,
344 ))
345 }
346 PatternKind::Vote => {
347 let mut vote = Vote::new(step.agents.clone());
348 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
349 vote = vote.with_synthesizer(synth);
350 }
351 let r = vote.run(task, runner, infra).await?;
352 Ok((
353 StageOutput::Pattern {
354 outputs: r.votes,
355 final_answer: r.winner.clone(),
356 },
357 r.winner,
358 ))
359 }
360 PatternKind::Fleet => {
361 let mut fleet = Fleet::new(step.agents.clone());
362 if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
363 fleet = fleet.with_timeout(timeout);
364 }
365 let r = fleet.run(runner, infra).await?;
366 let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
367 Ok((
368 StageOutput::Pattern {
369 outputs: r.outputs,
370 final_answer: summary.clone(),
371 },
372 summary,
373 ))
374 }
375 }
376 }
377
378 async fn execute_proposal(
380 &self,
381 step: &ProposalStep,
382 ) -> Result<(StageOutput, String), WorkflowError> {
383 let runtime = self.infra.make_runtime();
384 let result = runtime.execute(&step.proposal).await;
385
386 if result.all_succeeded() {
387 let answer = result
388 .results
389 .last()
390 .and_then(|r| r.output.as_ref())
391 .map(|v| v.to_string())
392 .unwrap_or_default();
393 Ok((StageOutput::Proposal { result }, answer))
394 } else {
395 let errors: Vec<String> = result
396 .results
397 .iter()
398 .filter_map(|r| r.error.as_ref())
399 .cloned()
400 .collect();
401 Err(WorkflowError::StageFailed(
402 "proposal".into(),
403 errors.join("; "),
404 ))
405 }
406 }
407
408 async fn execute_sub_workflow(
410 &self,
411 step: &SubWorkflowStep,
412 ) -> Result<(StageOutput, String), WorkflowError> {
413 let result = self.run(&step.workflow).await?;
414 let answer = result
415 .stages
416 .last()
417 .and_then(|s| match &s.output {
418 StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
419 StageOutput::Proposal { result } => result
420 .results
421 .last()
422 .and_then(|r| r.output.as_ref())
423 .map(|v| v.to_string()),
424 StageOutput::SubWorkflow { result } => Some(format!(
425 "sub-workflow {} {}",
426 result.workflow_name,
427 if result.succeeded() {
428 "completed"
429 } else {
430 "failed"
431 }
432 )),
433 StageOutput::Empty => None,
434 })
435 .unwrap_or_default();
436
437 if result.succeeded() {
438 Ok((
439 StageOutput::SubWorkflow {
440 result: Box::new(result),
441 },
442 answer,
443 ))
444 } else {
445 Err(WorkflowError::StageFailed(
446 "sub_workflow".into(),
447 "sub-workflow failed".into(),
448 ))
449 }
450 }
451
452 async fn compensate(
454 &self,
455 workflow: &Workflow,
456 completed_stage_ids: &[String],
457 ) -> Vec<CompensationResult> {
458 let mut results = Vec::new();
459
460 for stage_id in completed_stage_ids.iter().rev() {
461 let stage = match workflow.stage(stage_id) {
462 Some(s) => s,
463 None => continue,
464 };
465
466 let handler = match &stage.compensation {
467 Some(h) => h,
468 None => continue,
469 };
470
471 debug!(stage = %stage_id, "running compensation");
472 let comp_start = Instant::now();
473
474 let comp_result = match handler {
475 CompensationHandler::Proposal(ps) => self.execute_proposal(ps).await,
476 CompensationHandler::StageRef { stage_id: ref_id } => {
477 if let Some(ref_stage) = workflow.stage(ref_id) {
478 self.execute_step(&ref_stage.step, &HashMap::new()).await
479 } else {
480 Err(WorkflowError::StageNotFound(ref_id.clone()))
481 }
482 }
483 };
484
485 let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
486
487 match comp_result {
488 Ok(_) => {
489 results.push(CompensationResult {
490 for_stage_id: stage_id.clone(),
491 status: StageStatus::Succeeded,
492 duration_ms: duration,
493 error: None,
494 });
495 }
496 Err(e) => {
497 warn!(stage = %stage_id, error = %e, "compensation failed");
498 results.push(CompensationResult {
499 for_stage_id: stage_id.clone(),
500 status: StageStatus::Failed,
501 duration_ms: duration,
502 error: Some(e.to_string()),
503 });
504 }
505 }
506 }
507
508 results
509 }
510}
511
512fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
516 conditions
517 .iter()
518 .all(|cond| evaluate_precondition(cond, state))
519}
520
521fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
523 let op = cond.operator.as_str();
524
525 match op {
526 "exists" => state.contains_key(&cond.key),
527 "not_exists" => !state.contains_key(&cond.key),
528 _ => {
529 let actual = match state.get(&cond.key) {
530 Some(v) => v,
531 None => return false, };
533 match op {
534 "eq" => actual == &cond.value,
535 "neq" => actual != &cond.value,
536 "gt" => compare_values(actual, &cond.value)
537 .map_or(false, |o| o == std::cmp::Ordering::Greater),
538 "gte" => compare_values(actual, &cond.value)
539 .map_or(false, |o| o != std::cmp::Ordering::Less),
540 "lt" => compare_values(actual, &cond.value)
541 .map_or(false, |o| o == std::cmp::Ordering::Less),
542 "lte" => compare_values(actual, &cond.value)
543 .map_or(false, |o| o != std::cmp::Ordering::Greater),
544 "contains" => {
545 if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
546 haystack.contains(needle)
547 } else {
548 false
549 }
550 }
551 _ => false,
552 }
553 }
554 }
555}
556
557fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
558 match (a.as_f64(), b.as_f64()) {
559 (Some(a), Some(b)) => a.partial_cmp(&b),
560 _ => match (a.as_str(), b.as_str()) {
561 (Some(a), Some(b)) => Some(a.cmp(b)),
562 _ => None,
563 },
564 }
565}
566
567fn extract_synthesizer(
571 config: &HashMap<String, Value>,
572 agents: &[car_multi::AgentSpec],
573) -> Option<car_multi::AgentSpec> {
574 config
575 .get("synthesizer_index")
576 .and_then(|v| v.as_u64())
577 .and_then(|i| agents.get(i as usize))
578 .cloned()
579}
580
581fn split_supervisor_workers(
583 agents: &[car_multi::AgentSpec],
584 config: &HashMap<String, Value>,
585) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
586 let idx = config
587 .get("supervisor_index")
588 .and_then(|v| v.as_u64())
589 .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
590
591 let supervisor = agents
592 .get(idx)
593 .cloned()
594 .unwrap_or_else(|| agents.last().unwrap().clone());
595 let workers: Vec<_> = agents
596 .iter()
597 .enumerate()
598 .filter(|(i, _)| *i != idx)
599 .map(|(_, a)| a.clone())
600 .collect();
601 (supervisor, workers)
602}
603
604fn split_delegator(
606 agents: &[car_multi::AgentSpec],
607 _config: &HashMap<String, Value>,
608) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
609 let main = agents
610 .first()
611 .cloned()
612 .unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
613 let specialists: HashMap<String, car_multi::AgentSpec> = agents
614 .iter()
615 .skip(1)
616 .map(|a| (a.name.clone(), a.clone()))
617 .collect();
618 (main, specialists)
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624
625 #[test]
626 fn precondition_eq() {
627 let mut state = HashMap::new();
628 state.insert("x".into(), Value::Bool(true));
629
630 let cond = car_ir::Precondition {
631 key: "x".into(),
632 operator: "eq".into(),
633 value: Value::Bool(true),
634 description: String::new(),
635 };
636 assert!(evaluate_precondition(&cond, &state));
637
638 let cond_false = car_ir::Precondition {
639 key: "x".into(),
640 operator: "eq".into(),
641 value: Value::Bool(false),
642 description: String::new(),
643 };
644 assert!(!evaluate_precondition(&cond_false, &state));
645 }
646
647 #[test]
648 fn precondition_exists() {
649 let mut state = HashMap::new();
650 state.insert("x".into(), Value::Null);
651
652 let exists = car_ir::Precondition {
653 key: "x".into(),
654 operator: "exists".into(),
655 value: Value::Null,
656 description: String::new(),
657 };
658 assert!(evaluate_precondition(&exists, &state));
659
660 let not_exists = car_ir::Precondition {
661 key: "y".into(),
662 operator: "exists".into(),
663 value: Value::Null,
664 description: String::new(),
665 };
666 assert!(!evaluate_precondition(¬_exists, &state));
667 }
668
669 #[test]
670 fn precondition_numeric_comparison() {
671 let mut state = HashMap::new();
672 state.insert("count".into(), serde_json::json!(5));
673
674 let gt = car_ir::Precondition {
675 key: "count".into(),
676 operator: "gt".into(),
677 value: serde_json::json!(3),
678 description: String::new(),
679 };
680 assert!(evaluate_precondition(>, &state));
681
682 let lt = car_ir::Precondition {
683 key: "count".into(),
684 operator: "lt".into(),
685 value: serde_json::json!(3),
686 description: String::new(),
687 };
688 assert!(!evaluate_precondition(<, &state));
689 }
690
691 #[test]
692 fn empty_conditions_always_pass() {
693 let state = HashMap::new();
694 assert!(check_conditions(&[], &state));
695 }
696}