Skip to main content

car_workflow/
engine.rs

1//! Workflow execution engine — walks the stage graph, dispatches to car-multi
2//! patterns and car-engine proposals, manages state flow and saga compensation.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12    AdversarialReview, AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm,
13    SwarmMode, Tournament, Vote,
14};
15
16use crate::error::WorkflowError;
17use crate::result::*;
18use crate::types::*;
19
20/// What executing one stage step yields: its typed output, an answer string for
21/// edge conditions, and the state deltas to merge into workflow state.
22type StepOutcome = (StageOutput, String, HashMap<String, Value>);
23
24/// Workflow execution engine.
25pub struct WorkflowEngine {
26    runner: Arc<dyn AgentRunner>,
27    infra: SharedInfra,
28}
29
30impl WorkflowEngine {
31    pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
32        Self { runner, infra }
33    }
34
35    /// Execute a workflow to completion, following the stage graph.
36    pub fn run<'a>(
37        &'a self,
38        workflow: &'a Workflow,
39    ) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
40        Box::pin(self.run_inner(workflow))
41    }
42
43    async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
44        // Validate start stage exists
45        if workflow.stage(&workflow.start).is_none() {
46            return Err(WorkflowError::NoStartStage);
47        }
48
49        // Fail closed on pathologically deep loop/foreach/sub-workflow nesting
50        // before recursing into it — guards execution even when the caller
51        // skipped `verify_workflow`.
52        for stage in &workflow.stages {
53            if crate::verify::exceeds_nesting(&stage.step, crate::verify::MAX_STEP_NESTING_DEPTH) {
54                return Err(WorkflowError::StageFailed(
55                    stage.id.clone(),
56                    format!(
57                        "loop/foreach/sub-workflow nesting exceeds the limit of {}",
58                        crate::verify::MAX_STEP_NESTING_DEPTH
59                    ),
60                ));
61            }
62        }
63
64        let mut wf_state = HashMap::new();
65        // Pin the overall goal into state so every stage (and edge condition) can
66        // re-anchor on it — a structural guard against goal drift.
67        if let Some(goal) = &workflow.goal {
68            wf_state.insert("goal".to_string(), Value::String(goal.clone()));
69        }
70        let cursor = Cursor {
71            wf_state,
72            stage_results: Vec::new(),
73            completed_stage_ids: Vec::new(),
74            iterations: 0,
75            prior_duration_ms: 0.0,
76            current_id: workflow.start.clone(),
77        };
78        self.drive(workflow, new_run_id(), cursor, &HashMap::new())
79            .await
80    }
81
82    /// Re-run a workflow, reusing the successful prefix of a `prior` result.
83    ///
84    /// Every stage that succeeded in `prior` is replayed from its cached result
85    /// (instantly, no agent/proposal call); the first stage that did not succeed —
86    /// and everything after it — runs live, seeded with `prior.final_state`. Same
87    /// workflow + same prior result → the same resume point, deterministically.
88    ///
89    /// Use this to recover a long run that failed partway, or to continue after
90    /// fixing the cause of a failure, without re-paying for completed work. Does
91    /// not resume an approval *pause* (use [`resume`](Self::resume) for that),
92    /// though it *can* newly pause at an approval gate the prior run never reached.
93    ///
94    /// Replayed stages keep their original recorded `duration_ms`, so summing
95    /// per-stage durations across the original and cached runs would double-count
96    /// the replayed prefix.
97    pub async fn run_cached(
98        &self,
99        workflow: &Workflow,
100        prior: &WorkflowResult,
101    ) -> Result<WorkflowResult, WorkflowError> {
102        if workflow.stage(&workflow.start).is_none() {
103            return Err(WorkflowError::NoStartStage);
104        }
105
106        // Cache only the stages that actually succeeded.
107        let cache: HashMap<String, StageResult> = prior
108            .stages
109            .iter()
110            .filter(|s| s.status == StageStatus::Succeeded)
111            .map(|s| (s.stage_id.clone(), s.clone()))
112            .collect();
113
114        // Seed state from the prior run so cached stages' effects are visible to
115        // edge conditions.
116        let mut wf_state = prior.final_state.clone();
117
118        // Strip the per-stage bookkeeping (`stage.<id>.{succeeded,error,answer,
119        // ...}`) of any stage NOT in the cache. The prior run's failed/partial
120        // stages left these behind; carrying them would let an edge that
121        // forward-references a not-yet-rerun stage's slot route differently than
122        // the original run saw at this point. Genuine data deltas are untouched.
123        let stage_ids: Vec<String> = workflow.stages.iter().map(|s| s.id.clone()).collect();
124        wf_state.retain(|k, _| {
125            match k.strip_prefix("stage.") {
126                // Keep keys for cached stages; drop bookkeeping for the rest.
127                Some(rest) => {
128                    let id = rest.split('.').next().unwrap_or("");
129                    cache.contains_key(id) || !stage_ids.iter().any(|s| s == id)
130                }
131                None => true,
132            }
133        });
134
135        // Re-pin the goal authoritatively (after seeding, which may carry a stale
136        // or clobbered value).
137        if let Some(goal) = &workflow.goal {
138            wf_state.insert("goal".to_string(), Value::String(goal.clone()));
139        } else {
140            wf_state.remove("goal");
141        }
142
143        let cursor = Cursor {
144            wf_state,
145            stage_results: Vec::new(),
146            completed_stage_ids: Vec::new(),
147            iterations: 0,
148            prior_duration_ms: 0.0,
149            current_id: workflow.start.clone(),
150        };
151        self.drive(workflow, new_run_id(), cursor, &cache).await
152    }
153
154    /// Resume a run that was parked at an approval gate.
155    ///
156    /// Applies `input` as the approval stage's response (written to its
157    /// `output_key` and mirrored to `stage.<id>.answer`), records the stage as
158    /// succeeded, then continues the stage graph. Chained approval gates pause
159    /// again, returning a fresh [`PausedWorkflow`] each time.
160    pub async fn resume(
161        &self,
162        paused: PausedWorkflow,
163        input: HashMap<String, Value>,
164    ) -> Result<WorkflowResult, WorkflowError> {
165        let PausedWorkflow {
166            run_id,
167            workflow,
168            paused_stage_id,
169            mut wf_state,
170            mut stage_results,
171            mut completed_stage_ids,
172            iterations,
173            prior_duration_ms,
174            ..
175        } = paused;
176
177        // The checkpoint is serializable and crosses the FFI/persistence
178        // boundary, so treat it as untrusted: re-derive everything we can from
179        // the embedded definition rather than the carried copies. Confirm the
180        // parked stage really is an approval gate and read its declared
181        // `output_key` from the step itself (ignoring the checkpoint's copy).
182        let approval = match workflow.stage(&paused_stage_id).map(|s| &s.step) {
183            Some(StageStep::Approval(ap)) => ap.clone(),
184            Some(_) => {
185                return Err(WorkflowError::InvalidResume(format!(
186                    "stage '{paused_stage_id}' is not an approval gate"
187                )))
188            }
189            None => {
190                return Err(WorkflowError::InvalidResume(format!(
191                    "paused stage '{paused_stage_id}' not found in workflow"
192                )))
193            }
194        };
195        let output_key = approval.output_key.clone();
196        if output_key.trim().is_empty() {
197            return Err(WorkflowError::InvalidResume(format!(
198                "approval stage '{paused_stage_id}' has empty output_key"
199            )));
200        }
201        if output_key == "goal" {
202            return Err(WorkflowError::InvalidResume(
203                "approval output_key 'goal' is reserved (it is the drift anchor)".into(),
204            ));
205        }
206
207        // `goal` is a reserved anchor: re-derive it from the authoritative
208        // workflow definition rather than trusting the checkpoint's wf_state
209        // (which is treated as untrusted, like output_key above).
210        if let Some(goal) = &workflow.goal {
211            wf_state.insert("goal".to_string(), Value::String(goal.clone()));
212        } else {
213            wf_state.remove("goal");
214        }
215
216        // Enforce the form contract the human was shown: required fields present,
217        // `options` values in range. Lets a caller turn a rejection into a
218        // re-prompt instead of silently mis-routing on a failed edge condition.
219        validate_approval_input(&approval.fields, &input)?;
220
221        // The human's response, written three ways:
222        //   1. the whole object at `output_key`,
223        //   2. each top-level field flattened to `output_key.<field>` so edge
224        //      conditions can branch on individual scalar answers (approve/revise).
225        //      Flattening is top-level only — nested objects stay opaque.
226        //   3. `stage.<id>.answer` / `.succeeded` for the standard stage slots.
227        let response = Value::Object(input.into_iter().collect());
228        if let Value::Object(map) = &response {
229            for (k, v) in map {
230                wf_state.insert(format!("{output_key}.{k}"), v.clone());
231            }
232        }
233        wf_state.insert(output_key.clone(), response.clone());
234        wf_state.insert(
235            format!("stage.{paused_stage_id}.succeeded"),
236            Value::Bool(true),
237        );
238        wf_state.insert(
239            format!("stage.{paused_stage_id}.answer"),
240            Value::String(response.to_string()),
241        );
242
243        let stage_name = workflow
244            .stage(&paused_stage_id)
245            .map(|s| s.name.clone())
246            .unwrap_or_else(|| paused_stage_id.clone());
247        stage_results.push(StageResult {
248            stage_id: paused_stage_id.clone(),
249            stage_name,
250            status: StageStatus::Succeeded,
251            output: StageOutput::Approval { response },
252            duration_ms: 0.0,
253            error: None,
254        });
255        completed_stage_ids.push(paused_stage_id.clone());
256
257        info!(stage = %paused_stage_id, run_id = %run_id, "approval resumed");
258
259        // Where does control flow after the gate?
260        match next_stage(&workflow, &paused_stage_id, &wf_state) {
261            Some(next_id) => {
262                let cursor = Cursor {
263                    wf_state,
264                    stage_results,
265                    completed_stage_ids,
266                    iterations,
267                    prior_duration_ms,
268                    current_id: next_id,
269                };
270                self.drive(&workflow, run_id, cursor, &HashMap::new()).await
271            }
272            None => {
273                // The gate was terminal — the run completes on approval.
274                info!(workflow = %workflow.name, "workflow completed (terminal approval gate)");
275                Ok(completed_result(
276                    &workflow,
277                    stage_results,
278                    wf_state,
279                    prior_duration_ms,
280                ))
281            }
282        }
283    }
284
285    /// Drive the stage graph forward from a cursor position until the run
286    /// completes, fails, or pauses at an approval gate.
287    ///
288    /// `cache` maps stage id → a prior successful `StageResult`. A stage found in
289    /// the cache is **replayed from cache** (not re-executed) — the prefix of an
290    /// already-completed run is reused instantly, and the first uncached stage
291    /// (and everything after it) runs live. Pass an empty map for a normal run.
292    async fn drive(
293        &self,
294        workflow: &Workflow,
295        run_id: String,
296        mut cur: Cursor,
297        cache: &HashMap<String, StageResult>,
298    ) -> Result<WorkflowResult, WorkflowError> {
299        let start = Instant::now();
300
301        loop {
302            cur.iterations += 1;
303            if cur.iterations > workflow.max_iterations {
304                return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
305            }
306
307            let stage = workflow
308                .stage(&cur.current_id)
309                .ok_or_else(|| WorkflowError::StageNotFound(cur.current_id.clone()))?;
310
311            // Prefix cache: a stage that already succeeded in a prior run is
312            // replayed from its cached result rather than re-executed. The
313            // seeded wf_state already carries that stage's state, so edge
314            // conditions still evaluate correctly. Checked before the approval
315            // gate so a previously-answered gate is not re-paused.
316            if let Some(cached) = cache.get(&cur.current_id) {
317                debug!(stage = %cur.current_id, "replaying cached stage");
318                cur.wf_state
319                    .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
320                cur.stage_results.push(cached.clone());
321                cur.completed_stage_ids.push(stage.id.clone());
322                match next_stage(workflow, &cur.current_id, &cur.wf_state) {
323                    Some(next_id) => {
324                        cur.current_id = next_id;
325                        continue;
326                    }
327                    None => {
328                        let duration =
329                            cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
330                        return Ok(completed_result(
331                            workflow,
332                            cur.stage_results,
333                            cur.wf_state,
334                            duration,
335                        ));
336                    }
337                }
338            }
339
340            // Approval gate: pause BEFORE doing any work, snapshot, and return.
341            // The run resumes only via `resume()` with the human's response.
342            if let StageStep::Approval(ap) = &stage.step {
343                // Validated by `verify_workflow`, but guard here too: an empty
344                // output_key would silently drop the human's response.
345                if ap.output_key.trim().is_empty() {
346                    return Err(WorkflowError::StageFailed(
347                        stage.id.clone(),
348                        "approval stage has empty output_key".into(),
349                    ));
350                }
351                info!(stage = %stage.id, run_id = %run_id, "workflow paused at approval gate");
352                // Wall time accumulated across pause/resume segments; the human
353                // wait between pause and resume is deliberately not counted.
354                let elapsed = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
355                let now = chrono::Utc::now();
356                let paused = PausedWorkflow {
357                    run_id: run_id.clone(),
358                    workflow: workflow.clone(),
359                    paused_stage_id: stage.id.clone(),
360                    prompt: ap.prompt.clone(),
361                    fields: ap.fields.clone(),
362                    output_key: ap.output_key.clone(),
363                    wf_state: cur.wf_state.clone(),
364                    stage_results: cur.stage_results.clone(),
365                    completed_stage_ids: cur.completed_stage_ids.clone(),
366                    iterations: cur.iterations,
367                    prior_duration_ms: elapsed,
368                    created_at: now,
369                };
370                return Ok(WorkflowResult {
371                    workflow_id: workflow.id.clone(),
372                    workflow_name: workflow.name.clone(),
373                    status: WorkflowStatus::Paused,
374                    stages: cur.stage_results,
375                    compensations: vec![],
376                    duration_ms: elapsed,
377                    timestamp: now,
378                    final_state: cur.wf_state,
379                    paused: Some(paused),
380                });
381            }
382
383            debug!(stage = %stage.id, name = %stage.name, iteration = cur.iterations, "executing stage");
384
385            // Execute the stage
386            let stage_start = Instant::now();
387            let result = if let Some(timeout_ms) = stage.timeout_ms {
388                match tokio::time::timeout(
389                    std::time::Duration::from_millis(timeout_ms),
390                    self.execute_step(&stage.id, &stage.step, &cur.wf_state),
391                )
392                .await
393                {
394                    Ok(r) => r,
395                    Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
396                }
397            } else {
398                self.execute_step(&stage.id, &stage.step, &cur.wf_state).await
399            };
400            let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
401
402            match result {
403                Ok((output, answer, deltas)) => {
404                    // Merge the state deltas the step produced (proposal
405                    // state_changes, loop/foreach per-iteration slots) FIRST, so
406                    // the runtime-reserved slots written below intentionally win
407                    // over any colliding delta.
408                    for (k, v) in deltas {
409                        cur.wf_state.insert(k, v);
410                    }
411                    // `goal` is a reserved anchor: re-assert it so no stage delta
412                    // (a proposal writing a key named "goal", etc.) can repoint
413                    // the drift guard mid-run.
414                    if let Some(goal) = &workflow.goal {
415                        cur.wf_state
416                            .insert("goal".to_string(), Value::String(goal.clone()));
417                    }
418                    cur.wf_state
419                        .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
420                    cur.wf_state
421                        .insert(format!("stage.{}.answer", stage.id), Value::String(answer));
422
423                    cur.stage_results.push(StageResult {
424                        stage_id: stage.id.clone(),
425                        stage_name: stage.name.clone(),
426                        status: StageStatus::Succeeded,
427                        output,
428                        duration_ms: stage_duration,
429                        error: None,
430                    });
431                    cur.completed_stage_ids.push(stage.id.clone());
432
433                    info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
434                }
435                Err(e) => {
436                    let error_msg = e.to_string();
437                    cur.wf_state
438                        .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(false));
439                    cur.wf_state.insert(
440                        format!("stage.{}.error", stage.id),
441                        Value::String(error_msg.clone()),
442                    );
443
444                    cur.stage_results.push(StageResult {
445                        stage_id: stage.id.clone(),
446                        stage_name: stage.name.clone(),
447                        status: StageStatus::Failed,
448                        output: StageOutput::Empty,
449                        duration_ms: stage_duration,
450                        error: Some(error_msg.clone()),
451                    });
452
453                    warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
454
455                    // Run saga compensation in reverse order
456                    let compensations = self.compensate(workflow, &cur.completed_stage_ids).await;
457
458                    let all_compensated = compensations
459                        .iter()
460                        .all(|c| c.status == StageStatus::Succeeded);
461                    let any_compensated = !compensations.is_empty();
462
463                    let status = if any_compensated && all_compensated {
464                        WorkflowStatus::Compensated
465                    } else if any_compensated {
466                        WorkflowStatus::PartiallyCompensated
467                    } else {
468                        WorkflowStatus::Failed
469                    };
470
471                    return Ok(WorkflowResult {
472                        workflow_id: workflow.id.clone(),
473                        workflow_name: workflow.name.clone(),
474                        status,
475                        stages: cur.stage_results,
476                        compensations,
477                        duration_ms: cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0,
478                        timestamp: chrono::Utc::now(),
479                        final_state: cur.wf_state,
480                        paused: None,
481                    });
482                }
483            }
484
485            // Evaluate outgoing edges to find the next stage
486            match next_stage(workflow, &cur.current_id, &cur.wf_state) {
487                Some(next_id) => {
488                    debug!(from = %cur.current_id, to = %next_id, "taking edge");
489                    cur.current_id = next_id;
490                }
491                None => {
492                    // Terminal stage — workflow complete
493                    info!(
494                        workflow = %workflow.name,
495                        stages_executed = cur.stage_results.len(),
496                        "workflow completed"
497                    );
498                    let duration = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
499                    return Ok(completed_result(
500                        workflow,
501                        cur.stage_results,
502                        cur.wf_state,
503                        duration,
504                    ));
505                }
506            }
507        }
508    }
509
510    /// Execute a single stage step, returning `(output, answer_string,
511    /// state_deltas)`. `stage_id` namespaces any state the step writes;
512    /// `wf_state` is the read-only state visible to the step. Boxed because
513    /// `LoopUntil`/`ForEach` recurse into their inner body step.
514    fn execute_step<'a>(
515        &'a self,
516        stage_id: &'a str,
517        step: &'a StageStep,
518        wf_state: &'a HashMap<String, Value>,
519    ) -> futures::future::BoxFuture<'a, Result<StepOutcome, WorkflowError>> {
520        Box::pin(async move {
521            match step {
522                StageStep::Pattern(ps) => {
523                    let (out, answer) = self.execute_pattern(ps, wf_state).await?;
524                    // Surface a review verdict as a typed state slot so edges can
525                    // branch on `stage.<id>.review_passed` (bool), not prose.
526                    let mut deltas = HashMap::new();
527                    if let StageOutput::Review {
528                        passed,
529                        blocker_count,
530                        ..
531                    } = &out
532                    {
533                        deltas.insert(
534                            format!("stage.{stage_id}.review_passed"),
535                            Value::Bool(*passed),
536                        );
537                        deltas.insert(
538                            format!("stage.{stage_id}.review_blockers"),
539                            Value::from(*blocker_count),
540                        );
541                    }
542                    Ok((out, answer, deltas))
543                }
544                StageStep::Proposal(ps) => {
545                    let (out, answer) = self.execute_proposal(ps).await?;
546                    // Surface proposal state_changes as deltas so the caller
547                    // merges them into workflow state uniformly.
548                    let mut deltas = HashMap::new();
549                    if let StageOutput::Proposal { ref result } = out {
550                        for ar in &result.results {
551                            for (k, v) in &ar.state_changes {
552                                deltas.insert(k.clone(), v.clone());
553                            }
554                        }
555                    }
556                    Ok((out, answer, deltas))
557                }
558                StageStep::SubWorkflow(sw) => {
559                    let (out, answer) = self.execute_sub_workflow(sw).await?;
560                    Ok((out, answer, HashMap::new()))
561                }
562                StageStep::LoopUntil(ls) => self.execute_loop_until(stage_id, ls, wf_state).await,
563                StageStep::ForEach(fe) => self.execute_for_each(stage_id, fe, wf_state).await,
564                // Approval gates are intercepted by `drive`/`resume` before
565                // reaching here; this arm only fires if one is nested inside a
566                // loop/foreach body or referenced as a compensation handler,
567                // neither of which is meaningful.
568                StageStep::Approval(_) => Err(WorkflowError::StageFailed(
569                    "approval".into(),
570                    "an approval gate cannot be executed as a step, loop/foreach body, or compensation".into(),
571                )),
572            }
573        })
574    }
575
576    /// Run a `LoopUntil` step: repeat the body until `until` holds or the cap is
577    /// hit. Body state is threaded forward across iterations and exposed to
578    /// `until` via `stage.<id>.answer` and `stage.<id>.iteration`.
579    async fn execute_loop_until(
580        &self,
581        stage_id: &str,
582        ls: &LoopUntilStep,
583        wf_state: &HashMap<String, Value>,
584    ) -> Result<StepOutcome, WorkflowError> {
585        if ls.max_iterations == 0 {
586            return Err(WorkflowError::StageFailed(
587                stage_id.to_string(),
588                "loop_until requires max_iterations >= 1".into(),
589            ));
590        }
591
592        let body_id = format!("{stage_id}.body");
593        let mut working = wf_state.clone();
594        let mut accumulated: HashMap<String, Value> = HashMap::new();
595        let mut outputs: Vec<Box<StageOutput>> = Vec::new();
596        let mut last_answer = String::new();
597        let mut satisfied = false;
598        let mut ran: u32 = 0;
599
600        for _ in 0..ls.max_iterations {
601            let (out, answer, deltas) = self
602                .execute_step(&body_id, &ls.body, &working)
603                .await?;
604            ran += 1;
605
606            // Thread body deltas + the conventional loop slots forward so the
607            // next iteration and the `until` check can observe them.
608            for (k, v) in deltas {
609                working.insert(k.clone(), v.clone());
610                accumulated.insert(k, v);
611            }
612            let answer_val = Value::String(answer.clone());
613            let iter_val = Value::from(ran);
614            working.insert(format!("stage.{stage_id}.answer"), answer_val.clone());
615            working.insert(format!("stage.{stage_id}.iteration"), iter_val.clone());
616            accumulated.insert(format!("stage.{stage_id}.answer"), answer_val);
617            accumulated.insert(format!("stage.{stage_id}.iteration"), iter_val);
618
619            last_answer = answer;
620            outputs.push(Box::new(out));
621
622            // An empty `until` means "run exactly max_iterations" — never
623            // satisfied early.
624            if !ls.until.is_empty() && check_conditions(&ls.until, &working) {
625                satisfied = true;
626                break;
627            }
628        }
629
630        Ok((
631            StageOutput::Loop {
632                iterations: ran,
633                satisfied,
634                iterations_output: outputs,
635            },
636            last_answer,
637            accumulated,
638        ))
639    }
640
641    /// Run a `ForEach` step: resolve the item list from state at runtime and run
642    /// the (item-templated) body once per item, sequentially or with bounded
643    /// concurrency. Bodies read the pre-loop state; per-item answers are exposed
644    /// under `foreach.<id>.<index>.{item,answer}`.
645    async fn execute_for_each(
646        &self,
647        stage_id: &str,
648        fe: &ForEachStep,
649        wf_state: &HashMap<String, Value>,
650    ) -> Result<StepOutcome, WorkflowError> {
651        // Resolve items at runtime. A missing key or non-array is a no-op.
652        let items: Vec<Value> = wf_state
653            .get(&fe.items_from)
654            .and_then(|v| v.as_array().cloned())
655            .unwrap_or_default();
656        let item_strs: Vec<String> = items.iter().map(render_item).collect();
657
658        // Build a templated body per item up front (pure, cheap) so the async
659        // bodies below borrow owned copies.
660        let mut templated: Vec<StageStep> = Vec::with_capacity(items.len());
661        for (i, item) in item_strs.iter().enumerate() {
662            templated.push(template_step(&fe.body, item, i)?);
663        }
664
665        let concurrency = fe.max_concurrent.max(1);
666
667        // Build the per-item futures up front (avoids a higher-ranked lifetime
668        // snag with `stream::iter(...).map(closure)`).
669        let mut futs = Vec::with_capacity(templated.len());
670        for (i, body) in templated.iter().enumerate() {
671            futs.push(self.run_foreach_body(stage_id, i, body, wf_state));
672        }
673
674        // Drive the per-item bodies, preserving item order in the results.
675        let results: Vec<Result<StepOutcome, WorkflowError>> = if concurrency <= 1 {
676            let mut acc = Vec::with_capacity(futs.len());
677            for f in futs {
678                acc.push(f.await);
679            }
680            acc
681        } else {
682            use futures::stream::StreamExt;
683            futures::stream::iter(futs)
684                .buffered(concurrency)
685                .collect()
686                .await
687        };
688
689        let mut outputs: Vec<Box<StageOutput>> = Vec::with_capacity(results.len());
690        let mut deltas: HashMap<String, Value> = HashMap::new();
691        for (i, r) in results.into_iter().enumerate() {
692            let (out, answer, body_deltas) = r?;
693            deltas.insert(
694                format!("foreach.{stage_id}.{i}.item"),
695                Value::String(item_strs[i].clone()),
696            );
697            deltas.insert(
698                format!("foreach.{stage_id}.{i}.answer"),
699                Value::String(answer),
700            );
701            // Re-namespace each body's state deltas per item so they don't
702            // clobber across concurrent items but stay recoverable downstream.
703            for (k, v) in body_deltas {
704                deltas.insert(format!("foreach.{stage_id}.{i}.state.{k}"), v);
705            }
706            outputs.push(Box::new(out));
707        }
708        deltas.insert(
709            format!("foreach.{stage_id}.count"),
710            Value::from(item_strs.len()),
711        );
712
713        let answer = format!("{} item(s) processed", item_strs.len());
714        Ok((
715            StageOutput::ForEach {
716                items: item_strs,
717                outputs,
718            },
719            answer,
720            deltas,
721        ))
722    }
723
724    /// Dispatch to the appropriate car-multi pattern.
725    async fn execute_pattern(
726        &self,
727        step: &PatternStep,
728        wf_state: &HashMap<String, Value>,
729    ) -> Result<(StageOutput, String), WorkflowError> {
730        // Re-anchor the step on the pinned overall goal so a long run can't drift.
731        let anchored_task = match wf_state.get("goal").and_then(|g| g.as_str()) {
732            Some(goal) if !goal.trim().is_empty() => {
733                format!("Overall goal: {goal}\n\nCurrent step: {}", step.task)
734            }
735            _ => step.task.clone(),
736        };
737        let task = anchored_task.as_str();
738        let runner = &self.runner;
739        let infra = &self.infra;
740
741        match step.pattern {
742            PatternKind::SwarmParallel => {
743                let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
744                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
745                    swarm = swarm.with_synthesizer(synth);
746                }
747                let r = swarm.run(task, runner, infra).await?;
748                Ok((
749                    StageOutput::Pattern {
750                        outputs: r.outputs,
751                        final_answer: r.final_summary.clone(),
752                    },
753                    r.final_summary,
754                ))
755            }
756            PatternKind::SwarmSequential => {
757                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
758                let r = swarm.run(task, runner, infra).await?;
759                Ok((
760                    StageOutput::Pattern {
761                        outputs: r.outputs,
762                        final_answer: r.final_summary.clone(),
763                    },
764                    r.final_summary,
765                ))
766            }
767            PatternKind::SwarmDebate => {
768                let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
769                let r = swarm.run(task, runner, infra).await?;
770                Ok((
771                    StageOutput::Pattern {
772                        outputs: r.outputs,
773                        final_answer: r.final_summary.clone(),
774                    },
775                    r.final_summary,
776                ))
777            }
778            PatternKind::Pipeline => {
779                let pipeline = Pipeline::new(step.agents.clone());
780                let r = pipeline.run(task, runner, infra).await?;
781                Ok((
782                    StageOutput::Pattern {
783                        outputs: r.stages,
784                        final_answer: r.final_answer.clone(),
785                    },
786                    r.final_answer,
787                ))
788            }
789            PatternKind::Supervisor => {
790                let max_rounds = step
791                    .config
792                    .get("max_rounds")
793                    .and_then(|v| v.as_u64())
794                    .unwrap_or(3) as u32;
795                let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
796                let r = Supervisor::new(workers, supervisor)
797                    .with_max_rounds(max_rounds)
798                    .run(task, runner, infra)
799                    .await?;
800                let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
801                Ok((
802                    StageOutput::Pattern {
803                        outputs: all_outputs,
804                        final_answer: r.final_answer.clone(),
805                    },
806                    r.final_answer,
807                ))
808            }
809            PatternKind::Delegator => {
810                let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
811                let delegator = car_multi::Delegator::new(main_agent, specialists);
812                let r = delegator.run(task, runner, infra).await?;
813                Ok((
814                    StageOutput::Pattern {
815                        outputs: vec![car_multi::AgentOutput {
816                            name: "delegator".into(),
817                            answer: r.final_answer.clone(),
818                            turns: 0,
819                            tool_calls: r.delegations.len() as u32,
820                            duration_ms: 0.0,
821                            error: None,
822                            outcome: None,
823                            tokens: None,
824                        }],
825                        final_answer: r.final_answer.clone(),
826                    },
827                    r.final_answer,
828                ))
829            }
830            PatternKind::MapReduce => {
831                let max_concurrent = step
832                    .config
833                    .get("max_concurrent")
834                    .and_then(|v| v.as_u64())
835                    .unwrap_or(5) as usize;
836                let items: Vec<String> = step
837                    .config
838                    .get("items")
839                    .and_then(|v| serde_json::from_value(v.clone()).ok())
840                    .unwrap_or_default();
841
842                if step.agents.len() < 2 {
843                    return Err(WorkflowError::StageFailed(
844                        "map_reduce".into(),
845                        "requires at least 2 agents (mapper + reducer)".into(),
846                    ));
847                }
848                let mapper = step.agents[0].clone();
849                let reducer = step.agents[1].clone();
850
851                let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
852                let r = mr
853                    .run(
854                        task,
855                        &items
856                            .iter()
857                            .map(|s| s.as_str())
858                            .collect::<Vec<_>>()
859                            .iter()
860                            .map(|s| s.to_string())
861                            .collect::<Vec<_>>(),
862                        runner,
863                        infra,
864                    )
865                    .await?;
866                Ok((
867                    StageOutput::Pattern {
868                        outputs: r.map_outputs,
869                        final_answer: r.reduced_answer.clone(),
870                    },
871                    r.reduced_answer,
872                ))
873            }
874            PatternKind::Vote => {
875                let mut vote = Vote::new(step.agents.clone());
876                if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
877                    vote = vote.with_synthesizer(synth);
878                }
879                let r = vote.run(task, runner, infra).await?;
880                Ok((
881                    StageOutput::Pattern {
882                        outputs: r.votes,
883                        final_answer: r.winner.clone(),
884                    },
885                    r.winner,
886                ))
887            }
888            PatternKind::Fleet => {
889                let mut fleet = Fleet::new(step.agents.clone());
890                if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
891                    fleet = fleet.with_timeout(timeout);
892                }
893                let r = fleet.run(runner, infra).await?;
894                let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
895                Ok((
896                    StageOutput::Pattern {
897                        outputs: r.outputs,
898                        final_answer: summary.clone(),
899                    },
900                    summary,
901                ))
902            }
903            PatternKind::AdversarialReview => {
904                if step.agents.is_empty() {
905                    return Err(WorkflowError::StageFailed(
906                        "adversarial_review".into(),
907                        "requires a reviewer agent (agents[0])".into(),
908                    ));
909                }
910                let reviewer = step.agents[0].clone();
911                let criteria: Vec<String> = step
912                    .config
913                    .get("criteria")
914                    .and_then(|v| serde_json::from_value(v.clone()).ok())
915                    .unwrap_or_default();
916                // The work to review comes from state (produced by a prior stage).
917                let work = step
918                    .config
919                    .get("review_key")
920                    .and_then(|v| v.as_str())
921                    .and_then(|key| wf_state.get(key))
922                    .map(|v| match v {
923                        Value::String(s) => s.clone(),
924                        other => other.to_string(),
925                    })
926                    .unwrap_or_default();
927
928                // Fail closed on empty work: a missing/empty review_key must NOT
929                // produce a vacuous PASS (the very "premature done" failure this
930                // gate defends against), and skips a wasted reviewer call.
931                if work.trim().is_empty() {
932                    let answer = "FAIL (no work to review — missing or empty review_key)".to_string();
933                    return Ok((
934                        StageOutput::Review {
935                            passed: false,
936                            blocker_count: 1,
937                            findings: vec![],
938                            reviewer: car_multi::AgentOutput {
939                                name: "reviewer".into(),
940                                answer: answer.clone(),
941                                turns: 0,
942                                tool_calls: 0,
943                                duration_ms: 0.0,
944                                error: None,
945                                outcome: None,
946                                tokens: None,
947                            },
948                        },
949                        answer,
950                    ));
951                }
952
953                let r = AdversarialReview::new(reviewer, criteria)
954                    .run(&work, runner, infra)
955                    .await?;
956                // Answer carries a human-readable summary; the typed verdict is
957                // surfaced as `stage.<id>.review_passed` by the caller.
958                let answer = if r.passed {
959                    format!("PASS ({} finding(s))", r.findings.len())
960                } else {
961                    format!("FAIL ({} blocker(s))", r.blocker_count)
962                };
963                Ok((
964                    StageOutput::Review {
965                        passed: r.passed,
966                        blocker_count: r.blocker_count,
967                        findings: r.findings,
968                        reviewer: r.reviewer_output,
969                    },
970                    answer,
971                ))
972            }
973            PatternKind::Tournament => {
974                if step.agents.len() < 3 {
975                    return Err(WorkflowError::StageFailed(
976                        "tournament".into(),
977                        "requires at least 3 agents (>=2 competitors + 1 judge)".into(),
978                    ));
979                }
980                let judge_idx = step
981                    .config
982                    .get("judge_index")
983                    .and_then(|v| v.as_u64())
984                    .unwrap_or(step.agents.len() as u64 - 1) as usize;
985                let judge = step
986                    .agents
987                    .get(judge_idx)
988                    .cloned()
989                    .unwrap_or_else(|| step.agents.last().unwrap().clone());
990                let competitors: Vec<_> = step
991                    .agents
992                    .iter()
993                    .enumerate()
994                    .filter(|(i, _)| *i != judge_idx)
995                    .map(|(_, a)| a.clone())
996                    .collect();
997
998                let r = Tournament::new(competitors, judge)
999                    .run(task, runner, infra)
1000                    .await?;
1001                Ok((
1002                    StageOutput::Pattern {
1003                        outputs: r.candidates,
1004                        final_answer: r.winner_answer.clone(),
1005                    },
1006                    r.winner_answer,
1007                ))
1008            }
1009        }
1010    }
1011
1012    /// Run one ForEach body iteration. Returns the body's output, answer, and
1013    /// state deltas. The deltas are NOT merged into shared state under their own
1014    /// keys — concurrent items would clobber each other — but the caller
1015    /// re-namespaces them under `foreach.<id>.<index>.<key>` so per-item
1016    /// structured output is still recoverable downstream.
1017    async fn run_foreach_body(
1018        &self,
1019        stage_id: &str,
1020        index: usize,
1021        body: &StageStep,
1022        wf_state: &HashMap<String, Value>,
1023    ) -> Result<StepOutcome, WorkflowError> {
1024        let child_id = format!("{stage_id}.{index}");
1025        self.execute_step(&child_id, body, wf_state).await
1026    }
1027
1028    /// Execute a proposal step via car-engine.
1029    async fn execute_proposal(
1030        &self,
1031        step: &ProposalStep,
1032    ) -> Result<(StageOutput, String), WorkflowError> {
1033        let runtime = self.infra.make_runtime();
1034        let result = runtime.execute(&step.proposal).await;
1035
1036        if result.all_succeeded() {
1037            let answer = result
1038                .results
1039                .last()
1040                .and_then(|r| r.output.as_ref())
1041                .map(|v| v.to_string())
1042                .unwrap_or_default();
1043            Ok((StageOutput::Proposal { result }, answer))
1044        } else {
1045            let errors: Vec<String> = result
1046                .results
1047                .iter()
1048                .filter_map(|r| r.error.as_ref())
1049                .cloned()
1050                .collect();
1051            Err(WorkflowError::StageFailed(
1052                "proposal".into(),
1053                errors.join("; "),
1054            ))
1055        }
1056    }
1057
1058    /// Execute a nested sub-workflow.
1059    async fn execute_sub_workflow(
1060        &self,
1061        step: &SubWorkflowStep,
1062    ) -> Result<(StageOutput, String), WorkflowError> {
1063        let result = self.run(&step.workflow).await?;
1064
1065        // Approval gates inside sub-workflows would require nested pause/resume
1066        // plumbing the parent run cannot yet thread back to the caller. Fail
1067        // explicitly rather than silently treating a pause as a failure.
1068        // NOTE: surfacing a nested pause to the parent run would need a
1069        // checkpoint *stack* (parent position + child checkpoint). Until that
1070        // exists we fail explicitly. This currently flows as a stage failure,
1071        // which triggers the parent's saga compensation — i.e. a sub-workflow
1072        // gate rolls the parent back. Documented limitation.
1073        if result.is_paused() {
1074            return Err(WorkflowError::ApprovalInSubWorkflow(
1075                step.workflow.id.clone(),
1076            ));
1077        }
1078
1079        let answer = result
1080            .stages
1081            .last()
1082            .and_then(|s| match &s.output {
1083                StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
1084                StageOutput::Proposal { result } => result
1085                    .results
1086                    .last()
1087                    .and_then(|r| r.output.as_ref())
1088                    .map(|v| v.to_string()),
1089                StageOutput::SubWorkflow { result } => Some(format!(
1090                    "sub-workflow {} {}",
1091                    result.workflow_name,
1092                    if result.succeeded() {
1093                        "completed"
1094                    } else {
1095                        "failed"
1096                    }
1097                )),
1098                StageOutput::Approval { response } => Some(response.to_string()),
1099                StageOutput::Review {
1100                    passed,
1101                    blocker_count,
1102                    ..
1103                } => Some(format!(
1104                    "review {}",
1105                    if *passed {
1106                        "passed".to_string()
1107                    } else {
1108                        format!("failed ({blocker_count} blocker(s))")
1109                    }
1110                )),
1111                StageOutput::Loop {
1112                    iterations,
1113                    satisfied,
1114                    ..
1115                } => Some(format!(
1116                    "loop ran {} iteration(s), until {}satisfied",
1117                    iterations,
1118                    if *satisfied { "" } else { "not " }
1119                )),
1120                StageOutput::ForEach { items, .. } => {
1121                    Some(format!("foreach over {} item(s)", items.len()))
1122                }
1123                StageOutput::Empty => None,
1124            })
1125            .unwrap_or_default();
1126
1127        if result.succeeded() {
1128            Ok((
1129                StageOutput::SubWorkflow {
1130                    result: Box::new(result),
1131                },
1132                answer,
1133            ))
1134        } else {
1135            Err(WorkflowError::StageFailed(
1136                "sub_workflow".into(),
1137                "sub-workflow failed".into(),
1138            ))
1139        }
1140    }
1141
1142    /// Run saga compensation in reverse order of completed stages.
1143    async fn compensate(
1144        &self,
1145        workflow: &Workflow,
1146        completed_stage_ids: &[String],
1147    ) -> Vec<CompensationResult> {
1148        let mut results = Vec::new();
1149
1150        for stage_id in completed_stage_ids.iter().rev() {
1151            let stage = match workflow.stage(stage_id) {
1152                Some(s) => s,
1153                None => continue,
1154            };
1155
1156            let handler = match &stage.compensation {
1157                Some(h) => h,
1158                None => continue,
1159            };
1160
1161            debug!(stage = %stage_id, "running compensation");
1162            let comp_start = Instant::now();
1163
1164            let comp_result = match handler {
1165                CompensationHandler::Proposal(ps) => self.execute_proposal(ps).await,
1166                CompensationHandler::StageRef { stage_id: ref_id } => {
1167                    if let Some(ref_stage) = workflow.stage(ref_id) {
1168                        self.execute_step(ref_id, &ref_stage.step, &HashMap::new())
1169                            .await
1170                            .map(|(out, answer, _deltas)| (out, answer))
1171                    } else {
1172                        Err(WorkflowError::StageNotFound(ref_id.clone()))
1173                    }
1174                }
1175            };
1176
1177            let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
1178
1179            match comp_result {
1180                Ok(_) => {
1181                    results.push(CompensationResult {
1182                        for_stage_id: stage_id.clone(),
1183                        status: StageStatus::Succeeded,
1184                        duration_ms: duration,
1185                        error: None,
1186                    });
1187                }
1188                Err(e) => {
1189                    warn!(stage = %stage_id, error = %e, "compensation failed");
1190                    results.push(CompensationResult {
1191                        for_stage_id: stage_id.clone(),
1192                        status: StageStatus::Failed,
1193                        duration_ms: duration,
1194                        error: Some(e.to_string()),
1195                    });
1196                }
1197            }
1198        }
1199
1200        results
1201    }
1202}
1203
1204/// Mutable position while driving the stage graph. Captured into a
1205/// [`PausedWorkflow`] at an approval gate and restored on resume.
1206struct Cursor {
1207    wf_state: HashMap<String, Value>,
1208    stage_results: Vec<StageResult>,
1209    completed_stage_ids: Vec<String>,
1210    iterations: u32,
1211    /// Wall time of executed segments before the current `drive` call (carried
1212    /// across pause/resume so `duration_ms` reflects total compute, not just the
1213    /// post-resume segment). Excludes the human wait.
1214    prior_duration_ms: f64,
1215    current_id: String,
1216}
1217
1218/// Generate a stable run identifier. This is both the resume token and the
1219/// checkpoint filename, so use the full 122-bit UUID — no truncation.
1220fn new_run_id() -> String {
1221    uuid::Uuid::new_v4().simple().to_string()
1222}
1223
1224/// Enforce an [`ApprovalStep`]'s form contract against the human's response.
1225fn validate_approval_input(
1226    fields: &[ApprovalField],
1227    input: &HashMap<String, Value>,
1228) -> Result<(), WorkflowError> {
1229    for field in fields {
1230        match input.get(&field.name) {
1231            None | Some(Value::Null) => {
1232                if field.required {
1233                    return Err(WorkflowError::InvalidApprovalInput(format!(
1234                        "required field '{}' is missing",
1235                        field.name
1236                    )));
1237                }
1238            }
1239            Some(value) => {
1240                if field.field_type == "options" && !field.options.is_empty() {
1241                    let ok = value
1242                        .as_str()
1243                        .map(|s| field.options.iter().any(|o| o == s))
1244                        .unwrap_or(false);
1245                    if !ok {
1246                        return Err(WorkflowError::InvalidApprovalInput(format!(
1247                            "field '{}' value {} is not one of {:?}",
1248                            field.name, value, field.options
1249                        )));
1250                    }
1251                }
1252            }
1253        }
1254    }
1255    Ok(())
1256}
1257
1258/// Pick the next stage after `from`: the first outgoing edge whose conditions
1259/// all pass. `None` means `from` is terminal.
1260fn next_stage(workflow: &Workflow, from: &str, state: &HashMap<String, Value>) -> Option<String> {
1261    workflow
1262        .outgoing_edges(from)
1263        .iter()
1264        .find(|e| check_conditions(&e.conditions, state))
1265        .map(|e| e.to.clone())
1266}
1267
1268/// Build a `Completed` workflow result.
1269fn completed_result(
1270    workflow: &Workflow,
1271    stage_results: Vec<StageResult>,
1272    final_state: HashMap<String, Value>,
1273    duration_ms: f64,
1274) -> WorkflowResult {
1275    WorkflowResult {
1276        workflow_id: workflow.id.clone(),
1277        workflow_name: workflow.name.clone(),
1278        status: WorkflowStatus::Completed,
1279        stages: stage_results,
1280        compensations: vec![],
1281        duration_ms,
1282        timestamp: chrono::Utc::now(),
1283        final_state,
1284        paused: None,
1285    }
1286}
1287
1288// --- ForEach item templating ---
1289
1290/// Render a ForEach item to the string substituted for `{{item}}`. A JSON
1291/// string uses its raw value (no quotes); anything else uses its compact JSON.
1292fn render_item(item: &Value) -> String {
1293    match item {
1294        Value::String(s) => s.clone(),
1295        other => other.to_string(),
1296    }
1297}
1298
1299/// Clone `body` with `{{item}}`/`{{index}}` substituted in every string. Done
1300/// via a serde round-trip so it covers pattern tasks, agent prompts, and
1301/// proposal parameters uniformly without per-variant code.
1302fn template_step(body: &StageStep, item: &str, index: usize) -> Result<StageStep, WorkflowError> {
1303    let mut v = serde_json::to_value(body)
1304        .map_err(|e| WorkflowError::StageFailed("foreach".into(), format!("serialize body: {e}")))?;
1305    substitute_in_value(&mut v, item, index);
1306    serde_json::from_value(v).map_err(|e| {
1307        WorkflowError::StageFailed("foreach".into(), format!("rebuild templated body: {e}"))
1308    })
1309}
1310
1311fn substitute_in_value(v: &mut Value, item: &str, index: usize) {
1312    match v {
1313        Value::String(s) => {
1314            if s.contains("{{item}}") || s.contains("{{index}}") {
1315                *s = s
1316                    .replace("{{item}}", item)
1317                    .replace("{{index}}", &index.to_string());
1318            }
1319        }
1320        Value::Array(a) => a.iter_mut().for_each(|e| substitute_in_value(e, item, index)),
1321        Value::Object(m) => m
1322            .values_mut()
1323            .for_each(|e| substitute_in_value(e, item, index)),
1324        _ => {}
1325    }
1326}
1327
1328// --- Precondition evaluation ---
1329
1330/// Evaluate edge conditions against workflow state. Returns true if all conditions pass.
1331fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
1332    conditions
1333        .iter()
1334        .all(|cond| evaluate_precondition(cond, state))
1335}
1336
1337/// Evaluate a single precondition against a state map.
1338fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
1339    let op = cond.operator.as_str();
1340
1341    match op {
1342        "exists" => state.contains_key(&cond.key),
1343        "not_exists" => !state.contains_key(&cond.key),
1344        _ => {
1345            let actual = match state.get(&cond.key) {
1346                Some(v) => v,
1347                None => return false, // key missing, condition fails
1348            };
1349            match op {
1350                "eq" => actual == &cond.value,
1351                "neq" => actual != &cond.value,
1352                "gt" => compare_values(actual, &cond.value)
1353                    .map_or(false, |o| o == std::cmp::Ordering::Greater),
1354                "gte" => compare_values(actual, &cond.value)
1355                    .map_or(false, |o| o != std::cmp::Ordering::Less),
1356                "lt" => compare_values(actual, &cond.value)
1357                    .map_or(false, |o| o == std::cmp::Ordering::Less),
1358                "lte" => compare_values(actual, &cond.value)
1359                    .map_or(false, |o| o != std::cmp::Ordering::Greater),
1360                "contains" => {
1361                    if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
1362                        haystack.contains(needle)
1363                    } else {
1364                        false
1365                    }
1366                }
1367                _ => false,
1368            }
1369        }
1370    }
1371}
1372
1373fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1374    match (a.as_f64(), b.as_f64()) {
1375        (Some(a), Some(b)) => a.partial_cmp(&b),
1376        _ => match (a.as_str(), b.as_str()) {
1377            (Some(a), Some(b)) => Some(a.cmp(b)),
1378            _ => None,
1379        },
1380    }
1381}
1382
1383// --- Pattern config helpers ---
1384
1385/// Extract synthesizer agent from config (used by Swarm and Vote).
1386fn extract_synthesizer(
1387    config: &HashMap<String, Value>,
1388    agents: &[car_multi::AgentSpec],
1389) -> Option<car_multi::AgentSpec> {
1390    config
1391        .get("synthesizer_index")
1392        .and_then(|v| v.as_u64())
1393        .and_then(|i| agents.get(i as usize))
1394        .cloned()
1395}
1396
1397/// Split agents into supervisor + workers. By default, last agent is supervisor.
1398fn split_supervisor_workers(
1399    agents: &[car_multi::AgentSpec],
1400    config: &HashMap<String, Value>,
1401) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
1402    let idx = config
1403        .get("supervisor_index")
1404        .and_then(|v| v.as_u64())
1405        .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
1406
1407    let supervisor = agents
1408        .get(idx)
1409        .cloned()
1410        .unwrap_or_else(|| agents.last().unwrap().clone());
1411    let workers: Vec<_> = agents
1412        .iter()
1413        .enumerate()
1414        .filter(|(i, _)| *i != idx)
1415        .map(|(_, a)| a.clone())
1416        .collect();
1417    (supervisor, workers)
1418}
1419
1420/// Split agents into main + specialists map for Delegator.
1421fn split_delegator(
1422    agents: &[car_multi::AgentSpec],
1423    _config: &HashMap<String, Value>,
1424) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
1425    let main = agents
1426        .first()
1427        .cloned()
1428        .unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
1429    let specialists: HashMap<String, car_multi::AgentSpec> = agents
1430        .iter()
1431        .skip(1)
1432        .map(|a| (a.name.clone(), a.clone()))
1433        .collect();
1434    (main, specialists)
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439    use super::*;
1440
1441    #[test]
1442    fn precondition_eq() {
1443        let mut state = HashMap::new();
1444        state.insert("x".into(), Value::Bool(true));
1445
1446        let cond = car_ir::Precondition {
1447            key: "x".into(),
1448            operator: "eq".into(),
1449            value: Value::Bool(true),
1450            description: String::new(),
1451        };
1452        assert!(evaluate_precondition(&cond, &state));
1453
1454        let cond_false = car_ir::Precondition {
1455            key: "x".into(),
1456            operator: "eq".into(),
1457            value: Value::Bool(false),
1458            description: String::new(),
1459        };
1460        assert!(!evaluate_precondition(&cond_false, &state));
1461    }
1462
1463    #[test]
1464    fn precondition_exists() {
1465        let mut state = HashMap::new();
1466        state.insert("x".into(), Value::Null);
1467
1468        let exists = car_ir::Precondition {
1469            key: "x".into(),
1470            operator: "exists".into(),
1471            value: Value::Null,
1472            description: String::new(),
1473        };
1474        assert!(evaluate_precondition(&exists, &state));
1475
1476        let not_exists = car_ir::Precondition {
1477            key: "y".into(),
1478            operator: "exists".into(),
1479            value: Value::Null,
1480            description: String::new(),
1481        };
1482        assert!(!evaluate_precondition(&not_exists, &state));
1483    }
1484
1485    #[test]
1486    fn precondition_numeric_comparison() {
1487        let mut state = HashMap::new();
1488        state.insert("count".into(), serde_json::json!(5));
1489
1490        let gt = car_ir::Precondition {
1491            key: "count".into(),
1492            operator: "gt".into(),
1493            value: serde_json::json!(3),
1494            description: String::new(),
1495        };
1496        assert!(evaluate_precondition(&gt, &state));
1497
1498        let lt = car_ir::Precondition {
1499            key: "count".into(),
1500            operator: "lt".into(),
1501            value: serde_json::json!(3),
1502            description: String::new(),
1503        };
1504        assert!(!evaluate_precondition(&lt, &state));
1505    }
1506
1507    #[test]
1508    fn empty_conditions_always_pass() {
1509        let state = HashMap::new();
1510        assert!(check_conditions(&[], &state));
1511    }
1512
1513    // --- Approval gate (HITL pause/resume) ---
1514
1515    use car_ir::ActionProposal;
1516
1517    /// A runner that must never be invoked by these tests (approval gates and
1518    /// empty proposals don't drive agents).
1519    struct NoopRunner;
1520
1521    #[async_trait::async_trait]
1522    impl car_multi::AgentRunner for NoopRunner {
1523        async fn run(
1524            &self,
1525            _spec: &car_multi::AgentSpec,
1526            _task: &str,
1527            _runtime: &car_engine::Runtime,
1528            _mailbox: &car_multi::Mailbox,
1529        ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
1530            Err(car_multi::MultiError::NoOutput)
1531        }
1532    }
1533
1534    fn test_engine() -> WorkflowEngine {
1535        WorkflowEngine::new(Arc::new(NoopRunner), car_multi::SharedInfra::new())
1536    }
1537
1538    fn approval_stage(id: &str, output_key: &str) -> Stage {
1539        Stage {
1540            id: id.into(),
1541            name: id.into(),
1542            step: StageStep::Approval(ApprovalStep {
1543                prompt: "approve?".into(),
1544                fields: vec![],
1545                output_key: output_key.into(),
1546            }),
1547            compensation: None,
1548            timeout_ms: None,
1549            metadata: HashMap::new(),
1550        }
1551    }
1552
1553    fn empty_proposal_stage(id: &str) -> Stage {
1554        Stage {
1555            id: id.into(),
1556            name: id.into(),
1557            step: StageStep::Proposal(ProposalStep {
1558                proposal: ActionProposal {
1559                    id: format!("p-{id}"),
1560                    source: "test".into(),
1561                    actions: vec![],
1562                    timestamp: chrono::Utc::now(),
1563                    context: HashMap::new(),
1564                },
1565            }),
1566            compensation: None,
1567            timeout_ms: None,
1568            metadata: HashMap::new(),
1569        }
1570    }
1571
1572    fn edge(from: &str, to: &str, conditions: Vec<car_ir::Precondition>) -> Edge {
1573        Edge {
1574            from: from.into(),
1575            to: to.into(),
1576            conditions,
1577            label: String::new(),
1578        }
1579    }
1580
1581    #[tokio::test]
1582    async fn pauses_at_approval_gate_without_executing_it() {
1583        let wf = Workflow {
1584            id: "wf".into(),
1585            name: "WF".into(),
1586            start: "gate".into(),
1587            goal: None,
1588            stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
1589            edges: vec![edge("gate", "done", vec![])],
1590            max_iterations: 100,
1591            metadata: HashMap::new(),
1592        };
1593
1594        let res = test_engine().run(&wf).await.unwrap();
1595
1596        assert_eq!(res.status, WorkflowStatus::Paused);
1597        assert!(res.is_paused());
1598        assert!(res.stages.is_empty(), "gate body must not run before resume");
1599
1600        let paused = res.paused.expect("checkpoint present when paused");
1601        assert_eq!(paused.paused_stage_id, "gate");
1602        assert_eq!(paused.output_key, "approval");
1603        assert_eq!(paused.prompt, "approve?");
1604        assert!(!paused.run_id.is_empty());
1605    }
1606
1607    #[tokio::test]
1608    async fn resume_records_response_and_completes() {
1609        let wf = Workflow {
1610            id: "wf".into(),
1611            name: "WF".into(),
1612            start: "gate".into(),
1613            goal: None,
1614            stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
1615            edges: vec![edge("gate", "done", vec![])],
1616            max_iterations: 100,
1617            metadata: HashMap::new(),
1618        };
1619        let eng = test_engine();
1620        let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1621
1622        let mut input = HashMap::new();
1623        input.insert("decision".to_string(), Value::String("approve".into()));
1624        let res = eng.resume(paused, input).await.unwrap();
1625
1626        assert_eq!(res.status, WorkflowStatus::Completed);
1627        assert!(res.succeeded());
1628        // Both the gate and the downstream stage are recorded, gate first.
1629        let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1630        assert_eq!(ran, vec!["gate", "done"]);
1631        assert!(matches!(res.stages[0].output, StageOutput::Approval { .. }));
1632        // Response is flattened for edge conditions and mirrored to the answer slot.
1633        assert_eq!(
1634            res.final_state.get("approval.decision"),
1635            Some(&Value::String("approve".into()))
1636        );
1637        assert!(res.final_state.contains_key("stage.gate.answer"));
1638        assert!(res.final_state.contains_key("approval"));
1639    }
1640
1641    #[tokio::test]
1642    async fn resume_branches_on_answer_after_checkpoint_roundtrip() {
1643        // gate routes to `approved` when approval.decision == "approve",
1644        // otherwise falls through the unconditional edge to `revise`.
1645        let wf = Workflow {
1646            id: "wf".into(),
1647            name: "WF".into(),
1648            start: "gate".into(),
1649            goal: None,
1650            stages: vec![
1651                approval_stage("gate", "approval"),
1652                empty_proposal_stage("approved"),
1653                empty_proposal_stage("revise"),
1654            ],
1655            edges: vec![
1656                edge(
1657                    "gate",
1658                    "approved",
1659                    vec![car_ir::Precondition {
1660                        key: "approval.decision".into(),
1661                        operator: "eq".into(),
1662                        value: Value::String("approve".into()),
1663                        description: String::new(),
1664                    }],
1665                ),
1666                edge("gate", "revise", vec![]),
1667            ],
1668            max_iterations: 100,
1669            metadata: HashMap::new(),
1670        };
1671        let eng = test_engine();
1672        let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1673
1674        // Simulate a process restart: persist and reload the checkpoint.
1675        let dir = std::env::temp_dir().join(format!(
1676            "car-wf-resume-{}",
1677            uuid::Uuid::new_v4().simple()
1678        ));
1679        let store = crate::CheckpointStore::open(&dir).unwrap();
1680        store.save(&paused).unwrap();
1681        let run_id = paused.run_id.clone();
1682        let reloaded = store.load(&run_id).unwrap().expect("checkpoint reloads");
1683
1684        let mut input = HashMap::new();
1685        input.insert("decision".to_string(), Value::String("approve".into()));
1686        let res = eng.resume(reloaded, input).await.unwrap();
1687
1688        assert_eq!(res.status, WorkflowStatus::Completed);
1689        let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1690        assert!(ran.contains(&"approved"), "approve answer routes to approved");
1691        assert!(!ran.contains(&"revise"), "revise branch must be skipped");
1692
1693        store.remove(&run_id).unwrap();
1694        let _ = std::fs::remove_dir_all(&dir);
1695    }
1696
1697    #[tokio::test]
1698    async fn iteration_guard_counts_each_stage_once_across_pause() {
1699        // A (proposal) -> gate. Reaching the gate should cost exactly 2 ticks,
1700        // not 4: resume advances past the gate before re-driving, so the gate is
1701        // not double-counted. (Refutes the "gate costs two ticks" concern.)
1702        let wf = Workflow {
1703            id: "wf".into(),
1704            name: "WF".into(),
1705            start: "a".into(),
1706            goal: None,
1707            stages: vec![
1708                empty_proposal_stage("a"),
1709                approval_stage("gate", "approval"),
1710                empty_proposal_stage("done"),
1711            ],
1712            edges: vec![edge("a", "gate", vec![]), edge("gate", "done", vec![])],
1713            max_iterations: 100,
1714            metadata: HashMap::new(),
1715        };
1716        let eng = test_engine();
1717        let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1718        // a = tick 1, gate = tick 2.
1719        assert_eq!(paused.iterations, 2);
1720
1721        let mut input = HashMap::new();
1722        input.insert("decision".to_string(), Value::String("ok".into()));
1723        let res = eng.resume(paused, input).await.unwrap();
1724        assert_eq!(res.status, WorkflowStatus::Completed);
1725        // a, gate, done all executed exactly once.
1726        let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1727        assert_eq!(ran, vec!["a", "gate", "done"]);
1728    }
1729
1730    #[tokio::test]
1731    async fn resume_rejects_missing_required_field() {
1732        let mut gate = approval_stage("gate", "approval");
1733        if let StageStep::Approval(ap) = &mut gate.step {
1734            ap.fields = vec![ApprovalField {
1735                name: "decision".into(),
1736                label: "Decision".into(),
1737                field_type: "text".into(),
1738                options: vec![],
1739                required: true,
1740            }];
1741        }
1742        let wf = Workflow {
1743            id: "wf".into(),
1744            name: "WF".into(),
1745            start: "gate".into(),
1746            goal: None,
1747            stages: vec![gate, empty_proposal_stage("done")],
1748            edges: vec![edge("gate", "done", vec![])],
1749            max_iterations: 100,
1750            metadata: HashMap::new(),
1751        };
1752        let eng = test_engine();
1753        let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1754        // Empty input omits the required field.
1755        let err = eng.resume(paused, HashMap::new()).await.unwrap_err();
1756        assert!(matches!(err, WorkflowError::InvalidApprovalInput(_)));
1757    }
1758
1759    #[tokio::test]
1760    async fn resume_rejects_checkpoint_pointing_at_non_approval_stage() {
1761        // Forge a checkpoint whose paused_stage_id names a proposal stage.
1762        let wf = Workflow {
1763            id: "wf".into(),
1764            name: "WF".into(),
1765            start: "work".into(),
1766            goal: None,
1767            stages: vec![empty_proposal_stage("work")],
1768            edges: vec![],
1769            max_iterations: 100,
1770            metadata: HashMap::new(),
1771        };
1772        let forged = PausedWorkflow {
1773            run_id: "forged".into(),
1774            workflow: wf,
1775            paused_stage_id: "work".into(),
1776            prompt: String::new(),
1777            fields: vec![],
1778            output_key: "x".into(),
1779            wf_state: HashMap::new(),
1780            stage_results: vec![],
1781            completed_stage_ids: vec![],
1782            iterations: 1,
1783            prior_duration_ms: 0.0,
1784            created_at: chrono::Utc::now(),
1785        };
1786        let eng = test_engine();
1787        let err = eng.resume(forged, HashMap::new()).await.unwrap_err();
1788        assert!(matches!(err, WorkflowError::InvalidResume(_)));
1789    }
1790
1791    // --- LoopUntil / ForEach (dynamic workflow IR) ---
1792
1793    fn state_write_action(key: &str, value: Value) -> car_ir::Action {
1794        car_ir::Action {
1795            id: format!("sw-{key}"),
1796            action_type: car_ir::ActionType::StateWrite,
1797            tool: None,
1798            parameters: [
1799                ("key".to_string(), Value::from(key)),
1800                ("value".to_string(), value),
1801            ]
1802            .into(),
1803            preconditions: vec![],
1804            expected_effects: HashMap::new(),
1805            state_dependencies: vec![],
1806            idempotent: false,
1807            max_retries: 0,
1808            failure_behavior: car_ir::FailureBehavior::Abort,
1809            timeout_ms: None,
1810            metadata: HashMap::new(),
1811        }
1812    }
1813
1814    fn proposal_step_writing(key: &str, value: Value) -> StageStep {
1815        StageStep::Proposal(ProposalStep {
1816            proposal: ActionProposal {
1817                id: format!("p-{key}"),
1818                source: "test".into(),
1819                actions: vec![state_write_action(key, value)],
1820                timestamp: chrono::Utc::now(),
1821                context: HashMap::new(),
1822            },
1823        })
1824    }
1825
1826    fn single_stage_wf(id: &str, step: StageStep) -> Workflow {
1827        Workflow {
1828            id: "wf".into(),
1829            name: "WF".into(),
1830            start: id.into(),
1831            goal: None,
1832            stages: vec![Stage {
1833                id: id.into(),
1834                name: id.into(),
1835                step,
1836                compensation: None,
1837                timeout_ms: None,
1838                metadata: HashMap::new(),
1839            }],
1840            edges: vec![],
1841            max_iterations: 100,
1842            metadata: HashMap::new(),
1843        }
1844    }
1845
1846    #[tokio::test]
1847    async fn loop_until_empty_predicate_runs_to_cap() {
1848        // Empty `until` => runs exactly max_iterations; satisfied stays false.
1849        let step = StageStep::LoopUntil(LoopUntilStep {
1850            body: Box::new(empty_proposal_stage("ignored").step),
1851            until: vec![],
1852            max_iterations: 3,
1853        });
1854        let wf = single_stage_wf("loop", step);
1855        let res = test_engine().run(&wf).await.unwrap();
1856
1857        assert_eq!(res.status, WorkflowStatus::Completed);
1858        match &res.stages[0].output {
1859            StageOutput::Loop {
1860                iterations,
1861                satisfied,
1862                iterations_output,
1863            } => {
1864                assert_eq!(*iterations, 3);
1865                assert!(!*satisfied);
1866                assert_eq!(iterations_output.len(), 3);
1867            }
1868            other => panic!("expected Loop output, got {other:?}"),
1869        }
1870        assert_eq!(
1871            res.final_state.get("stage.loop.iteration"),
1872            Some(&Value::from(3u32))
1873        );
1874    }
1875
1876    #[tokio::test]
1877    async fn loop_until_stops_when_predicate_satisfied() {
1878        // Body writes done=true; until checks it => stops after one iteration.
1879        let step = StageStep::LoopUntil(LoopUntilStep {
1880            body: Box::new(proposal_step_writing("done", Value::Bool(true))),
1881            until: vec![car_ir::Precondition {
1882                key: "done".into(),
1883                operator: "eq".into(),
1884                value: Value::Bool(true),
1885                description: String::new(),
1886            }],
1887            max_iterations: 10,
1888        });
1889        let wf = single_stage_wf("loop", step);
1890        let res = test_engine().run(&wf).await.unwrap();
1891
1892        match &res.stages[0].output {
1893            StageOutput::Loop {
1894                iterations,
1895                satisfied,
1896                ..
1897            } => {
1898                assert_eq!(*iterations, 1, "predicate true after first body run");
1899                assert!(*satisfied);
1900            }
1901            other => panic!("expected Loop output, got {other:?}"),
1902        }
1903        // The body's state write is merged into workflow state.
1904        assert_eq!(res.final_state.get("done"), Some(&Value::Bool(true)));
1905    }
1906
1907    #[tokio::test]
1908    async fn for_each_runs_body_per_runtime_item() {
1909        // An upstream stage publishes the item list; ForEach fans over it.
1910        let seed = Stage {
1911            id: "seed".into(),
1912            name: "seed".into(),
1913            step: proposal_step_writing(
1914                "files",
1915                serde_json::json!(["a.rs", "b.rs", "c.rs"]),
1916            ),
1917            compensation: None,
1918            timeout_ms: None,
1919            metadata: HashMap::new(),
1920        };
1921        let fan = Stage {
1922            id: "fan".into(),
1923            name: "fan".into(),
1924            step: StageStep::ForEach(ForEachStep {
1925                items_from: "files".into(),
1926                // Body writes the item into a per-run key, proving templating.
1927                body: Box::new(proposal_step_writing(
1928                    "seen_{{index}}",
1929                    Value::String("{{item}}".into()),
1930                )),
1931                max_concurrent: 2,
1932            }),
1933            compensation: None,
1934            timeout_ms: None,
1935            metadata: HashMap::new(),
1936        };
1937        let wf = Workflow {
1938            id: "wf".into(),
1939            name: "WF".into(),
1940            start: "seed".into(),
1941            goal: None,
1942            stages: vec![seed, fan],
1943            edges: vec![edge("seed", "fan", vec![])],
1944            max_iterations: 100,
1945            metadata: HashMap::new(),
1946        };
1947        let res = test_engine().run(&wf).await.unwrap();
1948
1949        assert_eq!(res.status, WorkflowStatus::Completed);
1950        let fan_out = &res.stages[1].output;
1951        match fan_out {
1952            StageOutput::ForEach { items, outputs } => {
1953                assert_eq!(items, &vec!["a.rs", "b.rs", "c.rs"]);
1954                assert_eq!(outputs.len(), 3);
1955            }
1956            other => panic!("expected ForEach output, got {other:?}"),
1957        }
1958        assert_eq!(
1959            res.final_state.get("foreach.fan.count"),
1960            Some(&Value::from(3usize))
1961        );
1962        // Per-item answer slots are exposed.
1963        assert_eq!(
1964            res.final_state.get("foreach.fan.0.item"),
1965            Some(&Value::String("a.rs".into()))
1966        );
1967        // The body's own state delta is recoverable, namespaced per item.
1968        assert_eq!(
1969            res.final_state.get("foreach.fan.0.state.seen_0"),
1970            Some(&Value::String("a.rs".into()))
1971        );
1972        assert_eq!(
1973            res.final_state.get("foreach.fan.2.state.seen_2"),
1974            Some(&Value::String("c.rs".into()))
1975        );
1976    }
1977
1978    #[tokio::test]
1979    async fn for_each_missing_key_is_noop() {
1980        let step = StageStep::ForEach(ForEachStep {
1981            items_from: "nonexistent".into(),
1982            body: Box::new(empty_proposal_stage("b").step),
1983            max_concurrent: 0,
1984        });
1985        let wf = single_stage_wf("fan", step);
1986        let res = test_engine().run(&wf).await.unwrap();
1987        assert_eq!(res.status, WorkflowStatus::Completed);
1988        match &res.stages[0].output {
1989            StageOutput::ForEach { items, outputs } => {
1990                assert!(items.is_empty());
1991                assert!(outputs.is_empty());
1992            }
1993            other => panic!("expected ForEach output, got {other:?}"),
1994        }
1995    }
1996
1997    // --- Goal pinning (drift defense) ---
1998
1999    /// Captures the task string the agent was handed, so we can assert the goal
2000    /// was anchored into it.
2001    struct CapturingRunner {
2002        last_task: std::sync::Arc<std::sync::Mutex<String>>,
2003    }
2004
2005    #[async_trait::async_trait]
2006    impl car_multi::AgentRunner for CapturingRunner {
2007        async fn run(
2008            &self,
2009            spec: &car_multi::AgentSpec,
2010            task: &str,
2011            _runtime: &car_engine::Runtime,
2012            _mailbox: &car_multi::Mailbox,
2013        ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
2014            *self.last_task.lock().unwrap() = task.to_string();
2015            Ok(car_multi::AgentOutput {
2016                name: spec.name.clone(),
2017                answer: "ok".into(),
2018                turns: 1,
2019                tool_calls: 0,
2020                duration_ms: 1.0,
2021                error: None,
2022                outcome: None,
2023                tokens: None,
2024            })
2025        }
2026    }
2027
2028    #[tokio::test]
2029    async fn goal_is_pinned_and_anchored_into_agent_task() {
2030        let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2031        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2032            last_task: last_task.clone(),
2033        });
2034        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2035
2036        let wf = Workflow {
2037            id: "wf".into(),
2038            name: "WF".into(),
2039            start: "s".into(),
2040            goal: Some("ship the release safely".into()),
2041            stages: vec![Stage {
2042                id: "s".into(),
2043                name: "s".into(),
2044                step: StageStep::Pattern(PatternStep {
2045                    pattern: PatternKind::SwarmParallel,
2046                    task: "draft the notes".into(),
2047                    agents: vec![car_multi::AgentSpec::new("a", "")],
2048                    config: HashMap::new(),
2049                }),
2050                compensation: None,
2051                timeout_ms: None,
2052                metadata: HashMap::new(),
2053            }],
2054            edges: vec![],
2055            max_iterations: 100,
2056            metadata: HashMap::new(),
2057        };
2058
2059        let res = eng.run(&wf).await.unwrap();
2060        assert_eq!(res.status, WorkflowStatus::Completed);
2061        // Goal pinned into final state.
2062        assert_eq!(
2063            res.final_state.get("goal"),
2064            Some(&Value::String("ship the release safely".into()))
2065        );
2066        // And re-anchored into the agent's task.
2067        let seen = last_task.lock().unwrap().clone();
2068        assert!(seen.contains("Overall goal: ship the release safely"), "got: {seen}");
2069        assert!(seen.contains("Current step: draft the notes"), "got: {seen}");
2070    }
2071
2072    /// Reviewer returns a structured PASS verdict; other agents echo their name.
2073    struct ReviewRunner;
2074
2075    #[async_trait::async_trait]
2076    impl car_multi::AgentRunner for ReviewRunner {
2077        async fn run(
2078            &self,
2079            spec: &car_multi::AgentSpec,
2080            _task: &str,
2081            _runtime: &car_engine::Runtime,
2082            _mailbox: &car_multi::Mailbox,
2083        ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
2084            let answer = if spec.name.contains("review") {
2085                r#"{"passed": true, "findings": [{"criterion":"complete","passed":true,"evidence":"all there","severity":"info"}]}"#.to_string()
2086            } else {
2087                spec.name.clone()
2088            };
2089            Ok(car_multi::AgentOutput {
2090                name: spec.name.clone(),
2091                answer,
2092                turns: 1,
2093                tool_calls: 0,
2094                duration_ms: 1.0,
2095                error: None,
2096                outcome: None,
2097                tokens: None,
2098            })
2099        }
2100    }
2101
2102    #[tokio::test]
2103    async fn adversarial_review_stage_gates_prior_work() {
2104        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
2105        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2106
2107        // Stage 1 writes the "work" to state; stage 2 reviews it with a fresh agent.
2108        let seed = Stage {
2109            id: "produce".into(),
2110            name: "produce".into(),
2111            step: proposal_step_writing("draft", Value::String("the finished work".into())),
2112            compensation: None,
2113            timeout_ms: None,
2114            metadata: HashMap::new(),
2115        };
2116        let review = Stage {
2117            id: "review".into(),
2118            name: "review".into(),
2119            step: StageStep::Pattern(PatternStep {
2120                pattern: PatternKind::AdversarialReview,
2121                task: "verify".into(),
2122                agents: vec![car_multi::AgentSpec::new("reviewer", "be strict")],
2123                config: [
2124                    (
2125                        "criteria".to_string(),
2126                        serde_json::json!(["work is complete"]),
2127                    ),
2128                    ("review_key".to_string(), Value::String("draft".into())),
2129                ]
2130                .into(),
2131            }),
2132            compensation: None,
2133            timeout_ms: None,
2134            metadata: HashMap::new(),
2135        };
2136        let wf = Workflow {
2137            id: "wf".into(),
2138            name: "WF".into(),
2139            start: "produce".into(),
2140            goal: None,
2141            stages: vec![seed, review],
2142            edges: vec![edge("produce", "review", vec![])],
2143            max_iterations: 100,
2144            metadata: HashMap::new(),
2145        };
2146
2147        let res = eng.run(&wf).await.unwrap();
2148        assert_eq!(res.status, WorkflowStatus::Completed);
2149        // The typed verdict is the robust branching signal.
2150        assert_eq!(
2151            res.final_state.get("stage.review.review_passed"),
2152            Some(&Value::Bool(true))
2153        );
2154        assert!(matches!(
2155            res.stages[1].output,
2156            StageOutput::Review { passed: true, .. }
2157        ));
2158    }
2159
2160    #[tokio::test]
2161    async fn adversarial_review_fails_closed_on_missing_work() {
2162        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
2163        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2164        // review_key points at a key no stage produced => empty work.
2165        let wf = single_stage_wf(
2166            "review",
2167            StageStep::Pattern(PatternStep {
2168                pattern: PatternKind::AdversarialReview,
2169                task: "verify".into(),
2170                agents: vec![car_multi::AgentSpec::new("reviewer", "")],
2171                config: [("review_key".to_string(), Value::String("nothing".into()))].into(),
2172            }),
2173        );
2174        let res = eng.run(&wf).await.unwrap();
2175        assert_eq!(res.status, WorkflowStatus::Completed);
2176        assert_eq!(
2177            res.final_state.get("stage.review.review_passed"),
2178            Some(&Value::Bool(false)),
2179            "empty work must fail closed, not vacuously pass"
2180        );
2181    }
2182
2183    #[tokio::test]
2184    async fn stage_delta_cannot_clobber_pinned_goal() {
2185        // A proposal stage that writes a key literally named "goal" must not
2186        // repoint the drift anchor used by the following pattern stage.
2187        let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2188        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2189            last_task: last_task.clone(),
2190        });
2191        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2192
2193        let hijack = Stage {
2194            id: "hijack".into(),
2195            name: "hijack".into(),
2196            step: proposal_step_writing("goal", Value::String("DO SOMETHING ELSE".into())),
2197            compensation: None,
2198            timeout_ms: None,
2199            metadata: HashMap::new(),
2200        };
2201        let work = Stage {
2202            id: "work".into(),
2203            name: "work".into(),
2204            step: StageStep::Pattern(PatternStep {
2205                pattern: PatternKind::SwarmParallel,
2206                task: "do the thing".into(),
2207                agents: vec![car_multi::AgentSpec::new("a", "")],
2208                config: HashMap::new(),
2209            }),
2210            compensation: None,
2211            timeout_ms: None,
2212            metadata: HashMap::new(),
2213        };
2214        let wf = Workflow {
2215            id: "wf".into(),
2216            name: "WF".into(),
2217            start: "hijack".into(),
2218            goal: Some("the real goal".into()),
2219            stages: vec![hijack, work],
2220            edges: vec![edge("hijack", "work", vec![])],
2221            max_iterations: 100,
2222            metadata: HashMap::new(),
2223        };
2224
2225        let res = eng.run(&wf).await.unwrap();
2226        assert_eq!(res.status, WorkflowStatus::Completed);
2227        // The anchor survived the clobber attempt.
2228        assert_eq!(
2229            res.final_state.get("goal"),
2230            Some(&Value::String("the real goal".into()))
2231        );
2232        let seen = last_task.lock().unwrap().clone();
2233        assert!(seen.contains("Overall goal: the real goal"), "got: {seen}");
2234    }
2235
2236    // --- Prefix-cache re-run ---
2237
2238    #[tokio::test]
2239    async fn run_cached_replays_prefix_and_runs_rest_live() {
2240        let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2241        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2242            last_task: last_task.clone(),
2243        });
2244        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2245
2246        // A (proposal) -> B (pattern). Prior run completed A but not B.
2247        let a = Stage {
2248            id: "a".into(),
2249            name: "a".into(),
2250            step: proposal_step_writing("k", Value::String("v".into())),
2251            compensation: None,
2252            timeout_ms: None,
2253            metadata: HashMap::new(),
2254        };
2255        let b = Stage {
2256            id: "b".into(),
2257            name: "b".into(),
2258            step: StageStep::Pattern(PatternStep {
2259                pattern: PatternKind::SwarmParallel,
2260                task: "do b".into(),
2261                agents: vec![car_multi::AgentSpec::new("agent", "")],
2262                config: HashMap::new(),
2263            }),
2264            compensation: None,
2265            timeout_ms: None,
2266            metadata: HashMap::new(),
2267        };
2268        let wf = Workflow {
2269            id: "wf".into(),
2270            name: "WF".into(),
2271            start: "a".into(),
2272            goal: None,
2273            stages: vec![a, b],
2274            edges: vec![edge("a", "b", vec![])],
2275            max_iterations: 100,
2276            metadata: HashMap::new(),
2277        };
2278
2279        // Prior result: A succeeded with a distinctive cached answer; B absent.
2280        let mut final_state = HashMap::new();
2281        final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
2282        final_state.insert("k".to_string(), Value::String("v".into()));
2283        let prior = WorkflowResult {
2284            workflow_id: "wf".into(),
2285            workflow_name: "WF".into(),
2286            status: WorkflowStatus::Failed,
2287            stages: vec![StageResult {
2288                stage_id: "a".into(),
2289                stage_name: "a".into(),
2290                status: StageStatus::Succeeded,
2291                output: StageOutput::Empty,
2292                duration_ms: 1.0,
2293                error: None,
2294            }],
2295            compensations: vec![],
2296            duration_ms: 1.0,
2297            timestamp: chrono::Utc::now(),
2298            final_state,
2299            paused: None,
2300        };
2301
2302        let res = eng.run_cached(&wf, &prior).await.unwrap();
2303        assert_eq!(res.status, WorkflowStatus::Completed);
2304        // A replayed from cache (no re-execution), B ran live.
2305        let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
2306        assert_eq!(ran, vec!["a", "b"]);
2307        assert!(
2308            !last_task.lock().unwrap().is_empty(),
2309            "stage B should have executed live"
2310        );
2311        // Stage A's recorded result is the cached one (Empty output), not a re-run.
2312        assert!(matches!(res.stages[0].output, StageOutput::Empty));
2313    }
2314
2315    #[tokio::test]
2316    async fn run_cached_strips_stale_uncached_stage_bookkeeping() {
2317        // Prior state carries a failed (uncached) stage's error key; on re-run
2318        // that stale bookkeeping must not survive to poison edge routing.
2319        let eng = test_engine();
2320        let wf = Workflow {
2321            id: "wf".into(),
2322            name: "WF".into(),
2323            start: "a".into(),
2324            goal: None,
2325            // "a" is terminal (no edges); "ghost" exists but is unreachable.
2326            stages: vec![empty_proposal_stage("a"), empty_proposal_stage("ghost")],
2327            edges: vec![],
2328            max_iterations: 100,
2329            metadata: HashMap::new(),
2330        };
2331        let mut final_state = HashMap::new();
2332        final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
2333        final_state.insert("stage.ghost.error".to_string(), Value::String("stale".into()));
2334        final_state.insert("real_data".to_string(), Value::from(42));
2335        let prior = WorkflowResult {
2336            workflow_id: "wf".into(),
2337            workflow_name: "WF".into(),
2338            status: WorkflowStatus::Failed,
2339            stages: vec![StageResult {
2340                stage_id: "a".into(),
2341                stage_name: "a".into(),
2342                status: StageStatus::Succeeded,
2343                output: StageOutput::Empty,
2344                duration_ms: 1.0,
2345                error: None,
2346            }],
2347            compensations: vec![],
2348            duration_ms: 1.0,
2349            timestamp: chrono::Utc::now(),
2350            final_state,
2351            paused: None,
2352        };
2353
2354        let res = eng.run_cached(&wf, &prior).await.unwrap();
2355        assert_eq!(res.status, WorkflowStatus::Completed);
2356        // Stale uncached-stage bookkeeping was stripped...
2357        assert!(!res.final_state.contains_key("stage.ghost.error"));
2358        // ...while genuine data and cached-stage keys survive.
2359        assert_eq!(res.final_state.get("real_data"), Some(&Value::from(42)));
2360        assert_eq!(
2361            res.final_state.get("stage.a.succeeded"),
2362            Some(&Value::Bool(true))
2363        );
2364    }
2365
2366    #[tokio::test]
2367    async fn run_cached_with_full_cache_executes_nothing() {
2368        let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2369        let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2370            last_task: last_task.clone(),
2371        });
2372        let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2373
2374        let wf = single_stage_wf(
2375            "only",
2376            StageStep::Pattern(PatternStep {
2377                pattern: PatternKind::SwarmParallel,
2378                task: "x".into(),
2379                agents: vec![car_multi::AgentSpec::new("agent", "")],
2380                config: HashMap::new(),
2381            }),
2382        );
2383        let prior = WorkflowResult {
2384            workflow_id: "wf".into(),
2385            workflow_name: "WF".into(),
2386            status: WorkflowStatus::Completed,
2387            stages: vec![StageResult {
2388                stage_id: "only".into(),
2389                stage_name: "only".into(),
2390                status: StageStatus::Succeeded,
2391                output: StageOutput::Empty,
2392                duration_ms: 1.0,
2393                error: None,
2394            }],
2395            compensations: vec![],
2396            duration_ms: 1.0,
2397            timestamp: chrono::Utc::now(),
2398            final_state: HashMap::new(),
2399            paused: None,
2400        };
2401
2402        let res = eng.run_cached(&wf, &prior).await.unwrap();
2403        assert_eq!(res.status, WorkflowStatus::Completed);
2404        assert!(
2405            last_task.lock().unwrap().is_empty(),
2406            "fully-cached run must not execute any agent"
2407        );
2408    }
2409}