Skip to main content

car_workflow/
engine.rs

1//! Workflow execution engine — walks the stage graph, dispatches to car-multi
2//! patterns and car-engine proposals, manages state flow and saga compensation.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12    AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm, SwarmMode,
13    Vote,
14};
15
16use crate::error::WorkflowError;
17use crate::result::*;
18use crate::types::*;
19
20/// Workflow execution engine.
21pub struct WorkflowEngine {
22    runner: Arc<dyn AgentRunner>,
23    infra: SharedInfra,
24}
25
26impl WorkflowEngine {
27    pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
28        Self { runner, infra }
29    }
30
31    /// Execute a workflow to completion, following the stage graph.
32    pub fn run<'a>(&'a self, workflow: &'a Workflow) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
33        Box::pin(self.run_inner(workflow))
34    }
35
36    async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
37        let start = Instant::now();
38
39        // Validate start stage exists
40        if workflow.stage(&workflow.start).is_none() {
41            return Err(WorkflowError::NoStartStage);
42        }
43
44        // Workflow state: accumulates across stages for edge condition evaluation
45        let mut wf_state: HashMap<String, Value> = HashMap::new();
46        let mut stage_results: Vec<StageResult> = Vec::new();
47        let mut completed_stage_ids: Vec<String> = Vec::new();
48        let mut iterations: u32 = 0;
49
50        let mut current_id = workflow.start.clone();
51
52        loop {
53            iterations += 1;
54            if iterations > workflow.max_iterations {
55                return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
56            }
57
58            let stage = workflow.stage(&current_id)
59                .ok_or_else(|| WorkflowError::StageNotFound(current_id.clone()))?;
60
61            debug!(stage = %stage.id, name = %stage.name, iteration = iterations, "executing stage");
62
63            // Execute the stage
64            let stage_start = Instant::now();
65            let result = if let Some(timeout_ms) = stage.timeout_ms {
66                match tokio::time::timeout(
67                    std::time::Duration::from_millis(timeout_ms),
68                    self.execute_step(&stage.step, &wf_state),
69                ).await {
70                    Ok(r) => r,
71                    Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
72                }
73            } else {
74                self.execute_step(&stage.step, &wf_state).await
75            };
76            let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
77
78            match result {
79                Ok((output, answer)) => {
80                    // Merge state from this stage
81                    wf_state.insert(
82                        format!("stage.{}.succeeded", stage.id),
83                        Value::Bool(true),
84                    );
85                    wf_state.insert(
86                        format!("stage.{}.answer", stage.id),
87                        Value::String(answer),
88                    );
89
90                    // Merge proposal state changes if applicable
91                    if let StageOutput::Proposal { ref result } = output {
92                        for ar in &result.results {
93                            for (k, v) in &ar.state_changes {
94                                wf_state.insert(k.clone(), v.clone());
95                            }
96                        }
97                    }
98
99                    stage_results.push(StageResult {
100                        stage_id: stage.id.clone(),
101                        stage_name: stage.name.clone(),
102                        status: StageStatus::Succeeded,
103                        output,
104                        duration_ms: stage_duration,
105                        error: None,
106                    });
107                    completed_stage_ids.push(stage.id.clone());
108
109                    info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
110                }
111                Err(e) => {
112                    let error_msg = e.to_string();
113                    wf_state.insert(
114                        format!("stage.{}.succeeded", stage.id),
115                        Value::Bool(false),
116                    );
117                    wf_state.insert(
118                        format!("stage.{}.error", stage.id),
119                        Value::String(error_msg.clone()),
120                    );
121
122                    stage_results.push(StageResult {
123                        stage_id: stage.id.clone(),
124                        stage_name: stage.name.clone(),
125                        status: StageStatus::Failed,
126                        output: StageOutput::Empty,
127                        duration_ms: stage_duration,
128                        error: Some(error_msg.clone()),
129                    });
130
131                    warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
132
133                    // Run saga compensation in reverse order
134                    let compensations = self
135                        .compensate(workflow, &completed_stage_ids)
136                        .await;
137
138                    let all_compensated = compensations.iter().all(|c| c.status == StageStatus::Succeeded);
139                    let any_compensated = !compensations.is_empty();
140
141                    let status = if any_compensated && all_compensated {
142                        WorkflowStatus::Compensated
143                    } else if any_compensated {
144                        WorkflowStatus::PartiallyCompensated
145                    } else {
146                        WorkflowStatus::Failed
147                    };
148
149                    return Ok(WorkflowResult {
150                        workflow_id: workflow.id.clone(),
151                        workflow_name: workflow.name.clone(),
152                        status,
153                        stages: stage_results,
154                        compensations,
155                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
156                        timestamp: chrono::Utc::now(),
157                        final_state: wf_state,
158                    });
159                }
160            }
161
162            // Evaluate outgoing edges to find the next stage
163            let edges = workflow.outgoing_edges(&current_id);
164            let next = edges.iter().find(|e| check_conditions(&e.conditions, &wf_state));
165
166            match next {
167                Some(edge) => {
168                    debug!(from = %current_id, to = %edge.to, label = %edge.label, "taking edge");
169                    current_id = edge.to.clone();
170                }
171                None => {
172                    // Terminal stage — workflow complete
173                    info!(
174                        workflow = %workflow.name,
175                        stages_executed = stage_results.len(),
176                        "workflow completed"
177                    );
178                    return Ok(WorkflowResult {
179                        workflow_id: workflow.id.clone(),
180                        workflow_name: workflow.name.clone(),
181                        status: WorkflowStatus::Completed,
182                        stages: stage_results,
183                        compensations: vec![],
184                        duration_ms: start.elapsed().as_secs_f64() * 1000.0,
185                        timestamp: chrono::Utc::now(),
186                        final_state: wf_state,
187                    });
188                }
189            }
190        }
191    }
192
193    /// Execute a single stage step, returning (output, answer_string).
194    async fn execute_step(
195        &self,
196        step: &StageStep,
197        _wf_state: &HashMap<String, Value>,
198    ) -> Result<(StageOutput, String), WorkflowError> {
199        match step {
200            StageStep::Pattern(ps) => self.execute_pattern(ps).await,
201            StageStep::Proposal(ps) => self.execute_proposal(ps).await,
202            StageStep::SubWorkflow(sw) => self.execute_sub_workflow(sw).await,
203        }
204    }
205
206    /// Dispatch to the appropriate car-multi pattern.
207    async fn execute_pattern(
208        &self,
209        step: &PatternStep,
210    ) -> Result<(StageOutput, String), WorkflowError> {
211        let task = &step.task;
212        let runner = &self.runner;
213        let infra = &self.infra;
214
215        match step.pattern {
216            PatternKind::SwarmParallel => {
217                let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
218                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
219                    swarm = swarm.with_synthesizer(synth);
220                }
221                let r = swarm.run(task, runner, infra).await?;
222                Ok((
223                    StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
224                    r.final_summary,
225                ))
226            }
227            PatternKind::SwarmSequential => {
228                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
229                let r = swarm.run(task, runner, infra).await?;
230                Ok((
231                    StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
232                    r.final_summary,
233                ))
234            }
235            PatternKind::SwarmDebate => {
236                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
237                let r = swarm.run(task, runner, infra).await?;
238                Ok((
239                    StageOutput::Pattern { outputs: r.outputs, final_answer: r.final_summary.clone() },
240                    r.final_summary,
241                ))
242            }
243            PatternKind::Pipeline => {
244                let pipeline = Pipeline::new(step.agents.clone());
245                let r = pipeline.run(task, runner, infra).await?;
246                Ok((
247                    StageOutput::Pattern { outputs: r.stages, final_answer: r.final_answer.clone() },
248                    r.final_answer,
249                ))
250            }
251            PatternKind::Supervisor => {
252                let max_rounds = step.config.get("max_rounds")
253                    .and_then(|v| v.as_u64())
254                    .unwrap_or(3) as u32;
255                let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
256                let r = Supervisor::new(workers, supervisor)
257                    .with_max_rounds(max_rounds)
258                    .run(task, runner, infra)
259                    .await?;
260                let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
261                Ok((
262                    StageOutput::Pattern { outputs: all_outputs, final_answer: r.final_answer.clone() },
263                    r.final_answer,
264                ))
265            }
266            PatternKind::Delegator => {
267                let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
268                let delegator = car_multi::Delegator::new(main_agent, specialists);
269                let r = delegator.run(task, runner, infra).await?;
270                Ok((
271                    StageOutput::Pattern {
272                        outputs: vec![car_multi::AgentOutput {
273                            name: "delegator".into(),
274                            answer: r.final_answer.clone(),
275                            turns: 0,
276                            tool_calls: r.delegations.len() as u32,
277                            duration_ms: 0.0,
278                            error: None,
279                            outcome: None,
280                            tokens: None,
281                        }],
282                        final_answer: r.final_answer.clone(),
283                    },
284                    r.final_answer,
285                ))
286            }
287            PatternKind::MapReduce => {
288                let max_concurrent = step.config.get("max_concurrent")
289                    .and_then(|v| v.as_u64())
290                    .unwrap_or(5) as usize;
291                let items: Vec<String> = step.config.get("items")
292                    .and_then(|v| serde_json::from_value(v.clone()).ok())
293                    .unwrap_or_default();
294
295                if step.agents.len() < 2 {
296                    return Err(WorkflowError::StageFailed(
297                        "map_reduce".into(),
298                        "requires at least 2 agents (mapper + reducer)".into(),
299                    ));
300                }
301                let mapper = step.agents[0].clone();
302                let reducer = step.agents[1].clone();
303
304                let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
305                let r = mr.run(task, &items.iter().map(|s| s.as_str()).collect::<Vec<_>>()
306                    .iter().map(|s| s.to_string()).collect::<Vec<_>>(),
307                    runner, infra).await?;
308                Ok((
309                    StageOutput::Pattern { outputs: r.map_outputs, final_answer: r.reduced_answer.clone() },
310                    r.reduced_answer,
311                ))
312            }
313            PatternKind::Vote => {
314                let mut vote = Vote::new(step.agents.clone());
315                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
316                    vote = vote.with_synthesizer(synth);
317                }
318                let r = vote.run(task, runner, infra).await?;
319                Ok((
320                    StageOutput::Pattern { outputs: r.votes, final_answer: r.winner.clone() },
321                    r.winner,
322                ))
323            }
324            PatternKind::Fleet => {
325                let mut fleet = Fleet::new(step.agents.clone());
326                if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
327                    fleet = fleet.with_timeout(timeout);
328                }
329                let r = fleet.run(runner, infra).await?;
330                let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
331                Ok((
332                    StageOutput::Pattern { outputs: r.outputs, final_answer: summary.clone() },
333                    summary,
334                ))
335            }
336        }
337    }
338
339    /// Execute a proposal step via car-engine.
340    async fn execute_proposal(
341        &self,
342        step: &ProposalStep,
343    ) -> Result<(StageOutput, String), WorkflowError> {
344        let runtime = self.infra.make_runtime();
345        let result = runtime.execute(&step.proposal).await;
346
347        if result.all_succeeded() {
348            let answer = result.results.last()
349                .and_then(|r| r.output.as_ref())
350                .map(|v| v.to_string())
351                .unwrap_or_default();
352            Ok((StageOutput::Proposal { result }, answer))
353        } else {
354            let errors: Vec<String> = result.results.iter()
355                .filter_map(|r| r.error.as_ref())
356                .cloned()
357                .collect();
358            Err(WorkflowError::StageFailed(
359                "proposal".into(),
360                errors.join("; "),
361            ))
362        }
363    }
364
365    /// Execute a nested sub-workflow.
366    async fn execute_sub_workflow(
367        &self,
368        step: &SubWorkflowStep,
369    ) -> Result<(StageOutput, String), WorkflowError> {
370        let result = self.run(&step.workflow).await?;
371        let answer = result.stages.last()
372            .and_then(|s| match &s.output {
373                StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
374                StageOutput::Proposal { result } => result.results.last()
375                    .and_then(|r| r.output.as_ref())
376                    .map(|v| v.to_string()),
377                StageOutput::SubWorkflow { result } => {
378                    Some(format!("sub-workflow {} {}", result.workflow_name,
379                        if result.succeeded() { "completed" } else { "failed" }))
380                }
381                StageOutput::Empty => None,
382            })
383            .unwrap_or_default();
384
385        if result.succeeded() {
386            Ok((StageOutput::SubWorkflow { result: Box::new(result) }, answer))
387        } else {
388            Err(WorkflowError::StageFailed(
389                "sub_workflow".into(),
390                "sub-workflow failed".into(),
391            ))
392        }
393    }
394
395    /// Run saga compensation in reverse order of completed stages.
396    async fn compensate(
397        &self,
398        workflow: &Workflow,
399        completed_stage_ids: &[String],
400    ) -> Vec<CompensationResult> {
401        let mut results = Vec::new();
402
403        for stage_id in completed_stage_ids.iter().rev() {
404            let stage = match workflow.stage(stage_id) {
405                Some(s) => s,
406                None => continue,
407            };
408
409            let handler = match &stage.compensation {
410                Some(h) => h,
411                None => continue,
412            };
413
414            debug!(stage = %stage_id, "running compensation");
415            let comp_start = Instant::now();
416
417            let comp_result = match handler {
418                CompensationHandler::Proposal(ps) => {
419                    self.execute_proposal(ps).await
420                }
421                CompensationHandler::StageRef { stage_id: ref_id } => {
422                    if let Some(ref_stage) = workflow.stage(ref_id) {
423                        self.execute_step(&ref_stage.step, &HashMap::new()).await
424                    } else {
425                        Err(WorkflowError::StageNotFound(ref_id.clone()))
426                    }
427                }
428            };
429
430            let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
431
432            match comp_result {
433                Ok(_) => {
434                    results.push(CompensationResult {
435                        for_stage_id: stage_id.clone(),
436                        status: StageStatus::Succeeded,
437                        duration_ms: duration,
438                        error: None,
439                    });
440                }
441                Err(e) => {
442                    warn!(stage = %stage_id, error = %e, "compensation failed");
443                    results.push(CompensationResult {
444                        for_stage_id: stage_id.clone(),
445                        status: StageStatus::Failed,
446                        duration_ms: duration,
447                        error: Some(e.to_string()),
448                    });
449                }
450            }
451        }
452
453        results
454    }
455}
456
457// --- Precondition evaluation ---
458
459/// Evaluate edge conditions against workflow state. Returns true if all conditions pass.
460fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
461    conditions.iter().all(|cond| evaluate_precondition(cond, state))
462}
463
464/// Evaluate a single precondition against a state map.
465fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
466    let op = cond.operator.as_str();
467
468    match op {
469        "exists" => state.contains_key(&cond.key),
470        "not_exists" => !state.contains_key(&cond.key),
471        _ => {
472            let actual = match state.get(&cond.key) {
473                Some(v) => v,
474                None => return false, // key missing, condition fails
475            };
476            match op {
477                "eq" => actual == &cond.value,
478                "neq" => actual != &cond.value,
479                "gt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Greater),
480                "gte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Less),
481                "lt" => compare_values(actual, &cond.value).map_or(false, |o| o == std::cmp::Ordering::Less),
482                "lte" => compare_values(actual, &cond.value).map_or(false, |o| o != std::cmp::Ordering::Greater),
483                "contains" => {
484                    if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
485                        haystack.contains(needle)
486                    } else {
487                        false
488                    }
489                }
490                _ => false,
491            }
492        }
493    }
494}
495
496fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
497    match (a.as_f64(), b.as_f64()) {
498        (Some(a), Some(b)) => a.partial_cmp(&b),
499        _ => match (a.as_str(), b.as_str()) {
500            (Some(a), Some(b)) => Some(a.cmp(b)),
501            _ => None,
502        },
503    }
504}
505
506// --- Pattern config helpers ---
507
508/// Extract synthesizer agent from config (used by Swarm and Vote).
509fn extract_synthesizer(
510    config: &HashMap<String, Value>,
511    agents: &[car_multi::AgentSpec],
512) -> Option<car_multi::AgentSpec> {
513    config.get("synthesizer_index")
514        .and_then(|v| v.as_u64())
515        .and_then(|i| agents.get(i as usize))
516        .cloned()
517}
518
519/// Split agents into supervisor + workers. By default, last agent is supervisor.
520fn split_supervisor_workers(
521    agents: &[car_multi::AgentSpec],
522    config: &HashMap<String, Value>,
523) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
524    let idx = config.get("supervisor_index")
525        .and_then(|v| v.as_u64())
526        .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
527
528    let supervisor = agents.get(idx).cloned().unwrap_or_else(|| agents.last().unwrap().clone());
529    let workers: Vec<_> = agents.iter().enumerate()
530        .filter(|(i, _)| *i != idx)
531        .map(|(_, a)| a.clone())
532        .collect();
533    (supervisor, workers)
534}
535
536/// Split agents into main + specialists map for Delegator.
537fn split_delegator(
538    agents: &[car_multi::AgentSpec],
539    _config: &HashMap<String, Value>,
540) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
541    let main = agents.first().cloned().unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
542    let specialists: HashMap<String, car_multi::AgentSpec> = agents.iter()
543        .skip(1)
544        .map(|a| (a.name.clone(), a.clone()))
545        .collect();
546    (main, specialists)
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552
553    #[test]
554    fn precondition_eq() {
555        let mut state = HashMap::new();
556        state.insert("x".into(), Value::Bool(true));
557
558        let cond = car_ir::Precondition {
559            key: "x".into(),
560            operator: "eq".into(),
561            value: Value::Bool(true),
562            description: String::new(),
563        };
564        assert!(evaluate_precondition(&cond, &state));
565
566        let cond_false = car_ir::Precondition {
567            key: "x".into(),
568            operator: "eq".into(),
569            value: Value::Bool(false),
570            description: String::new(),
571        };
572        assert!(!evaluate_precondition(&cond_false, &state));
573    }
574
575    #[test]
576    fn precondition_exists() {
577        let mut state = HashMap::new();
578        state.insert("x".into(), Value::Null);
579
580        let exists = car_ir::Precondition {
581            key: "x".into(),
582            operator: "exists".into(),
583            value: Value::Null,
584            description: String::new(),
585        };
586        assert!(evaluate_precondition(&exists, &state));
587
588        let not_exists = car_ir::Precondition {
589            key: "y".into(),
590            operator: "exists".into(),
591            value: Value::Null,
592            description: String::new(),
593        };
594        assert!(!evaluate_precondition(&not_exists, &state));
595    }
596
597    #[test]
598    fn precondition_numeric_comparison() {
599        let mut state = HashMap::new();
600        state.insert("count".into(), serde_json::json!(5));
601
602        let gt = car_ir::Precondition {
603            key: "count".into(),
604            operator: "gt".into(),
605            value: serde_json::json!(3),
606            description: String::new(),
607        };
608        assert!(evaluate_precondition(&gt, &state));
609
610        let lt = car_ir::Precondition {
611            key: "count".into(),
612            operator: "lt".into(),
613            value: serde_json::json!(3),
614            description: String::new(),
615        };
616        assert!(!evaluate_precondition(&lt, &state));
617    }
618
619    #[test]
620    fn empty_conditions_always_pass() {
621        let state = HashMap::new();
622        assert!(check_conditions(&[], &state));
623    }
624}