Skip to main content

car_workflow/
engine.rs

1//! Workflow execution engine — walks the stage graph, dispatches to car-multi
2//! patterns and car-engine proposals, manages state flow and saga compensation.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12    AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm, SwarmMode, Vote,
13};
14
15use crate::error::WorkflowError;
16use crate::result::*;
17use crate::types::*;
18
19/// Workflow execution engine.
20pub struct WorkflowEngine {
21    runner: Arc<dyn AgentRunner>,
22    infra: SharedInfra,
23}
24
25impl WorkflowEngine {
26    pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
27        Self { runner, infra }
28    }
29
30    /// Execute a workflow to completion, following the stage graph.
31    pub fn run<'a>(
32        &'a self,
33        workflow: &'a Workflow,
34    ) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
35        Box::pin(self.run_inner(workflow))
36    }
37
38    async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
39        let start = Instant::now();
40
41        // Validate start stage exists
42        if workflow.stage(&workflow.start).is_none() {
43            return Err(WorkflowError::NoStartStage);
44        }
45
46        // Workflow state: accumulates across stages for edge condition evaluation
47        let mut wf_state: HashMap<String, Value> = HashMap::new();
48        let mut stage_results: Vec<StageResult> = Vec::new();
49        let mut completed_stage_ids: Vec<String> = Vec::new();
50        let mut iterations: u32 = 0;
51
52        let mut current_id = workflow.start.clone();
53
54        loop {
55            iterations += 1;
56            if iterations > workflow.max_iterations {
57                return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
58            }
59
60            let stage = workflow
61                .stage(&current_id)
62                .ok_or_else(|| WorkflowError::StageNotFound(current_id.clone()))?;
63
64            debug!(stage = %stage.id, name = %stage.name, iteration = iterations, "executing stage");
65
66            // Execute the stage
67            let stage_start = Instant::now();
68            let result = if let Some(timeout_ms) = stage.timeout_ms {
69                match tokio::time::timeout(
70                    std::time::Duration::from_millis(timeout_ms),
71                    self.execute_step(&stage.step, &wf_state),
72                )
73                .await
74                {
75                    Ok(r) => r,
76                    Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
77                }
78            } else {
79                self.execute_step(&stage.step, &wf_state).await
80            };
81            let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
82
83            match result {
84                Ok((output, answer)) => {
85                    // Merge state from this stage
86                    wf_state.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
87                    wf_state.insert(format!("stage.{}.answer", stage.id), Value::String(answer));
88
89                    // Merge proposal state changes if applicable
90                    if let StageOutput::Proposal { ref result } = output {
91                        for ar in &result.results {
92                            for (k, v) in &ar.state_changes {
93                                wf_state.insert(k.clone(), v.clone());
94                            }
95                        }
96                    }
97
98                    stage_results.push(StageResult {
99                        stage_id: stage.id.clone(),
100                        stage_name: stage.name.clone(),
101                        status: StageStatus::Succeeded,
102                        output,
103                        duration_ms: stage_duration,
104                        error: None,
105                    });
106                    completed_stage_ids.push(stage.id.clone());
107
108                    info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
109                }
110                Err(e) => {
111                    let error_msg = e.to_string();
112                    wf_state.insert(format!("stage.{}.succeeded", stage.id), Value::Bool(false));
113                    wf_state.insert(
114                        format!("stage.{}.error", stage.id),
115                        Value::String(error_msg.clone()),
116                    );
117
118                    stage_results.push(StageResult {
119                        stage_id: stage.id.clone(),
120                        stage_name: stage.name.clone(),
121                        status: StageStatus::Failed,
122                        output: StageOutput::Empty,
123                        duration_ms: stage_duration,
124                        error: Some(error_msg.clone()),
125                    });
126
127                    warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
128
129                    // Run saga compensation in reverse order
130                    let compensations = self.compensate(workflow, &completed_stage_ids).await;
131
132                    let all_compensated = compensations
133                        .iter()
134                        .all(|c| c.status == StageStatus::Succeeded);
135                    let any_compensated = !compensations.is_empty();
136
137                    let status = if any_compensated && all_compensated {
138                        WorkflowStatus::Compensated
139                    } else if any_compensated {
140                        WorkflowStatus::PartiallyCompensated
141                    } else {
142                        WorkflowStatus::Failed
143                    };
144
145                    return Ok(WorkflowResult {
146                        workflow_id: workflow.id.clone(),
147                        workflow_name: workflow.name.clone(),
148                        status,
149                        stages: stage_results,
150                        compensations,
151                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
152                        timestamp: chrono::Utc::now(),
153                        final_state: wf_state,
154                    });
155                }
156            }
157
158            // Evaluate outgoing edges to find the next stage
159            let edges = workflow.outgoing_edges(&current_id);
160            let next = edges
161                .iter()
162                .find(|e| check_conditions(&e.conditions, &wf_state));
163
164            match next {
165                Some(edge) => {
166                    debug!(from = %current_id, to = %edge.to, label = %edge.label, "taking edge");
167                    current_id = edge.to.clone();
168                }
169                None => {
170                    // Terminal stage — workflow complete
171                    info!(
172                        workflow = %workflow.name,
173                        stages_executed = stage_results.len(),
174                        "workflow completed"
175                    );
176                    return Ok(WorkflowResult {
177                        workflow_id: workflow.id.clone(),
178                        workflow_name: workflow.name.clone(),
179                        status: WorkflowStatus::Completed,
180                        stages: stage_results,
181                        compensations: vec![],
182                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
183                        timestamp: chrono::Utc::now(),
184                        final_state: wf_state,
185                    });
186                }
187            }
188        }
189    }
190
191    /// Execute a single stage step, returning (output, answer_string).
192    async fn execute_step(
193        &self,
194        step: &StageStep,
195        _wf_state: &HashMap<String, Value>,
196    ) -> Result<(StageOutput, String), WorkflowError> {
197        match step {
198            StageStep::Pattern(ps) => self.execute_pattern(ps).await,
199            StageStep::Proposal(ps) => self.execute_proposal(ps).await,
200            StageStep::SubWorkflow(sw) => self.execute_sub_workflow(sw).await,
201        }
202    }
203
204    /// Dispatch to the appropriate car-multi pattern.
205    async fn execute_pattern(
206        &self,
207        step: &PatternStep,
208    ) -> Result<(StageOutput, String), WorkflowError> {
209        let task = &step.task;
210        let runner = &self.runner;
211        let infra = &self.infra;
212
213        match step.pattern {
214            PatternKind::SwarmParallel => {
215                let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
216                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
217                    swarm = swarm.with_synthesizer(synth);
218                }
219                let r = swarm.run(task, runner, infra).await?;
220                Ok((
221                    StageOutput::Pattern {
222                        outputs: r.outputs,
223                        final_answer: r.final_summary.clone(),
224                    },
225                    r.final_summary,
226                ))
227            }
228            PatternKind::SwarmSequential => {
229                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
230                let r = swarm.run(task, runner, infra).await?;
231                Ok((
232                    StageOutput::Pattern {
233                        outputs: r.outputs,
234                        final_answer: r.final_summary.clone(),
235                    },
236                    r.final_summary,
237                ))
238            }
239            PatternKind::SwarmDebate => {
240                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
241                let r = swarm.run(task, runner, infra).await?;
242                Ok((
243                    StageOutput::Pattern {
244                        outputs: r.outputs,
245                        final_answer: r.final_summary.clone(),
246                    },
247                    r.final_summary,
248                ))
249            }
250            PatternKind::Pipeline => {
251                let pipeline = Pipeline::new(step.agents.clone());
252                let r = pipeline.run(task, runner, infra).await?;
253                Ok((
254                    StageOutput::Pattern {
255                        outputs: r.stages,
256                        final_answer: r.final_answer.clone(),
257                    },
258                    r.final_answer,
259                ))
260            }
261            PatternKind::Supervisor => {
262                let max_rounds = step
263                    .config
264                    .get("max_rounds")
265                    .and_then(|v| v.as_u64())
266                    .unwrap_or(3) as u32;
267                let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
268                let r = Supervisor::new(workers, supervisor)
269                    .with_max_rounds(max_rounds)
270                    .run(task, runner, infra)
271                    .await?;
272                let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
273                Ok((
274                    StageOutput::Pattern {
275                        outputs: all_outputs,
276                        final_answer: r.final_answer.clone(),
277                    },
278                    r.final_answer,
279                ))
280            }
281            PatternKind::Delegator => {
282                let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
283                let delegator = car_multi::Delegator::new(main_agent, specialists);
284                let r = delegator.run(task, runner, infra).await?;
285                Ok((
286                    StageOutput::Pattern {
287                        outputs: vec![car_multi::AgentOutput {
288                            name: "delegator".into(),
289                            answer: r.final_answer.clone(),
290                            turns: 0,
291                            tool_calls: r.delegations.len() as u32,
292                            duration_ms: 0.0,
293                            error: None,
294                            outcome: None,
295                            tokens: None,
296                        }],
297                        final_answer: r.final_answer.clone(),
298                    },
299                    r.final_answer,
300                ))
301            }
302            PatternKind::MapReduce => {
303                let max_concurrent = step
304                    .config
305                    .get("max_concurrent")
306                    .and_then(|v| v.as_u64())
307                    .unwrap_or(5) as usize;
308                let items: Vec<String> = step
309                    .config
310                    .get("items")
311                    .and_then(|v| serde_json::from_value(v.clone()).ok())
312                    .unwrap_or_default();
313
314                if step.agents.len() < 2 {
315                    return Err(WorkflowError::StageFailed(
316                        "map_reduce".into(),
317                        "requires at least 2 agents (mapper + reducer)".into(),
318                    ));
319                }
320                let mapper = step.agents[0].clone();
321                let reducer = step.agents[1].clone();
322
323                let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
324                let r = mr
325                    .run(
326                        task,
327                        &items
328                            .iter()
329                            .map(|s| s.as_str())
330                            .collect::<Vec<_>>()
331                            .iter()
332                            .map(|s| s.to_string())
333                            .collect::<Vec<_>>(),
334                        runner,
335                        infra,
336                    )
337                    .await?;
338                Ok((
339                    StageOutput::Pattern {
340                        outputs: r.map_outputs,
341                        final_answer: r.reduced_answer.clone(),
342                    },
343                    r.reduced_answer,
344                ))
345            }
346            PatternKind::Vote => {
347                let mut vote = Vote::new(step.agents.clone());
348                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
349                    vote = vote.with_synthesizer(synth);
350                }
351                let r = vote.run(task, runner, infra).await?;
352                Ok((
353                    StageOutput::Pattern {
354                        outputs: r.votes,
355                        final_answer: r.winner.clone(),
356                    },
357                    r.winner,
358                ))
359            }
360            PatternKind::Fleet => {
361                let mut fleet = Fleet::new(step.agents.clone());
362                if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
363                    fleet = fleet.with_timeout(timeout);
364                }
365                let r = fleet.run(runner, infra).await?;
366                let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
367                Ok((
368                    StageOutput::Pattern {
369                        outputs: r.outputs,
370                        final_answer: summary.clone(),
371                    },
372                    summary,
373                ))
374            }
375        }
376    }
377
378    /// Execute a proposal step via car-engine.
379    async fn execute_proposal(
380        &self,
381        step: &ProposalStep,
382    ) -> Result<(StageOutput, String), WorkflowError> {
383        let runtime = self.infra.make_runtime();
384        let result = runtime.execute(&step.proposal).await;
385
386        if result.all_succeeded() {
387            let answer = result
388                .results
389                .last()
390                .and_then(|r| r.output.as_ref())
391                .map(|v| v.to_string())
392                .unwrap_or_default();
393            Ok((StageOutput::Proposal { result }, answer))
394        } else {
395            let errors: Vec<String> = result
396                .results
397                .iter()
398                .filter_map(|r| r.error.as_ref())
399                .cloned()
400                .collect();
401            Err(WorkflowError::StageFailed(
402                "proposal".into(),
403                errors.join("; "),
404            ))
405        }
406    }
407
408    /// Execute a nested sub-workflow.
409    async fn execute_sub_workflow(
410        &self,
411        step: &SubWorkflowStep,
412    ) -> Result<(StageOutput, String), WorkflowError> {
413        let result = self.run(&step.workflow).await?;
414        let answer = result
415            .stages
416            .last()
417            .and_then(|s| match &s.output {
418                StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
419                StageOutput::Proposal { result } => result
420                    .results
421                    .last()
422                    .and_then(|r| r.output.as_ref())
423                    .map(|v| v.to_string()),
424                StageOutput::SubWorkflow { result } => Some(format!(
425                    "sub-workflow {} {}",
426                    result.workflow_name,
427                    if result.succeeded() {
428                        "completed"
429                    } else {
430                        "failed"
431                    }
432                )),
433                StageOutput::Empty => None,
434            })
435            .unwrap_or_default();
436
437        if result.succeeded() {
438            Ok((
439                StageOutput::SubWorkflow {
440                    result: Box::new(result),
441                },
442                answer,
443            ))
444        } else {
445            Err(WorkflowError::StageFailed(
446                "sub_workflow".into(),
447                "sub-workflow failed".into(),
448            ))
449        }
450    }
451
452    /// Run saga compensation in reverse order of completed stages.
453    async fn compensate(
454        &self,
455        workflow: &Workflow,
456        completed_stage_ids: &[String],
457    ) -> Vec<CompensationResult> {
458        let mut results = Vec::new();
459
460        for stage_id in completed_stage_ids.iter().rev() {
461            let stage = match workflow.stage(stage_id) {
462                Some(s) => s,
463                None => continue,
464            };
465
466            let handler = match &stage.compensation {
467                Some(h) => h,
468                None => continue,
469            };
470
471            debug!(stage = %stage_id, "running compensation");
472            let comp_start = Instant::now();
473
474            let comp_result = match handler {
475                CompensationHandler::Proposal(ps) => self.execute_proposal(ps).await,
476                CompensationHandler::StageRef { stage_id: ref_id } => {
477                    if let Some(ref_stage) = workflow.stage(ref_id) {
478                        self.execute_step(&ref_stage.step, &HashMap::new()).await
479                    } else {
480                        Err(WorkflowError::StageNotFound(ref_id.clone()))
481                    }
482                }
483            };
484
485            let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
486
487            match comp_result {
488                Ok(_) => {
489                    results.push(CompensationResult {
490                        for_stage_id: stage_id.clone(),
491                        status: StageStatus::Succeeded,
492                        duration_ms: duration,
493                        error: None,
494                    });
495                }
496                Err(e) => {
497                    warn!(stage = %stage_id, error = %e, "compensation failed");
498                    results.push(CompensationResult {
499                        for_stage_id: stage_id.clone(),
500                        status: StageStatus::Failed,
501                        duration_ms: duration,
502                        error: Some(e.to_string()),
503                    });
504                }
505            }
506        }
507
508        results
509    }
510}
511
512// --- Precondition evaluation ---
513
514/// Evaluate edge conditions against workflow state. Returns true if all conditions pass.
515fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
516    conditions
517        .iter()
518        .all(|cond| evaluate_precondition(cond, state))
519}
520
521/// Evaluate a single precondition against a state map.
522fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
523    let op = cond.operator.as_str();
524
525    match op {
526        "exists" => state.contains_key(&cond.key),
527        "not_exists" => !state.contains_key(&cond.key),
528        _ => {
529            let actual = match state.get(&cond.key) {
530                Some(v) => v,
531                None => return false, // key missing, condition fails
532            };
533            match op {
534                "eq" => actual == &cond.value,
535                "neq" => actual != &cond.value,
536                "gt" => compare_values(actual, &cond.value)
537                    .map_or(false, |o| o == std::cmp::Ordering::Greater),
538                "gte" => compare_values(actual, &cond.value)
539                    .map_or(false, |o| o != std::cmp::Ordering::Less),
540                "lt" => compare_values(actual, &cond.value)
541                    .map_or(false, |o| o == std::cmp::Ordering::Less),
542                "lte" => compare_values(actual, &cond.value)
543                    .map_or(false, |o| o != std::cmp::Ordering::Greater),
544                "contains" => {
545                    if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
546                        haystack.contains(needle)
547                    } else {
548                        false
549                    }
550                }
551                _ => false,
552            }
553        }
554    }
555}
556
557fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
558    match (a.as_f64(), b.as_f64()) {
559        (Some(a), Some(b)) => a.partial_cmp(&b),
560        _ => match (a.as_str(), b.as_str()) {
561            (Some(a), Some(b)) => Some(a.cmp(b)),
562            _ => None,
563        },
564    }
565}
566
567// --- Pattern config helpers ---
568
569/// Extract synthesizer agent from config (used by Swarm and Vote).
570fn extract_synthesizer(
571    config: &HashMap<String, Value>,
572    agents: &[car_multi::AgentSpec],
573) -> Option<car_multi::AgentSpec> {
574    config
575        .get("synthesizer_index")
576        .and_then(|v| v.as_u64())
577        .and_then(|i| agents.get(i as usize))
578        .cloned()
579}
580
581/// Split agents into supervisor + workers. By default, last agent is supervisor.
582fn split_supervisor_workers(
583    agents: &[car_multi::AgentSpec],
584    config: &HashMap<String, Value>,
585) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
586    let idx = config
587        .get("supervisor_index")
588        .and_then(|v| v.as_u64())
589        .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
590
591    let supervisor = agents
592        .get(idx)
593        .cloned()
594        .unwrap_or_else(|| agents.last().unwrap().clone());
595    let workers: Vec<_> = agents
596        .iter()
597        .enumerate()
598        .filter(|(i, _)| *i != idx)
599        .map(|(_, a)| a.clone())
600        .collect();
601    (supervisor, workers)
602}
603
604/// Split agents into main + specialists map for Delegator.
605fn split_delegator(
606    agents: &[car_multi::AgentSpec],
607    _config: &HashMap<String, Value>,
608) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
609    let main = agents
610        .first()
611        .cloned()
612        .unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
613    let specialists: HashMap<String, car_multi::AgentSpec> = agents
614        .iter()
615        .skip(1)
616        .map(|a| (a.name.clone(), a.clone()))
617        .collect();
618    (main, specialists)
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624
625    #[test]
626    fn precondition_eq() {
627        let mut state = HashMap::new();
628        state.insert("x".into(), Value::Bool(true));
629
630        let cond = car_ir::Precondition {
631            key: "x".into(),
632            operator: "eq".into(),
633            value: Value::Bool(true),
634            description: String::new(),
635        };
636        assert!(evaluate_precondition(&cond, &state));
637
638        let cond_false = car_ir::Precondition {
639            key: "x".into(),
640            operator: "eq".into(),
641            value: Value::Bool(false),
642            description: String::new(),
643        };
644        assert!(!evaluate_precondition(&cond_false, &state));
645    }
646
647    #[test]
648    fn precondition_exists() {
649        let mut state = HashMap::new();
650        state.insert("x".into(), Value::Null);
651
652        let exists = car_ir::Precondition {
653            key: "x".into(),
654            operator: "exists".into(),
655            value: Value::Null,
656            description: String::new(),
657        };
658        assert!(evaluate_precondition(&exists, &state));
659
660        let not_exists = car_ir::Precondition {
661            key: "y".into(),
662            operator: "exists".into(),
663            value: Value::Null,
664            description: String::new(),
665        };
666        assert!(!evaluate_precondition(&not_exists, &state));
667    }
668
669    #[test]
670    fn precondition_numeric_comparison() {
671        let mut state = HashMap::new();
672        state.insert("count".into(), serde_json::json!(5));
673
674        let gt = car_ir::Precondition {
675            key: "count".into(),
676            operator: "gt".into(),
677            value: serde_json::json!(3),
678            description: String::new(),
679        };
680        assert!(evaluate_precondition(&gt, &state));
681
682        let lt = car_ir::Precondition {
683            key: "count".into(),
684            operator: "lt".into(),
685            value: serde_json::json!(3),
686            description: String::new(),
687        };
688        assert!(!evaluate_precondition(&lt, &state));
689    }
690
691    #[test]
692    fn empty_conditions_always_pass() {
693        let state = HashMap::new();
694        assert!(check_conditions(&[], &state));
695    }
696}