1use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::Instant;
7
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use car_multi::{
12 AdversarialReview, AgentRunner, Fleet, MapReduce, Pipeline, SharedInfra, Supervisor, Swarm,
13 SwarmMode, Tournament, Vote,
14};
15
16use crate::error::WorkflowError;
17use crate::result::*;
18use crate::types::*;
19
20type StepOutcome = (StageOutput, String, HashMap<String, Value>);
23
24pub struct WorkflowEngine {
26 runner: Arc<dyn AgentRunner>,
27 infra: SharedInfra,
28}
29
30impl WorkflowEngine {
31 pub fn new(runner: Arc<dyn AgentRunner>, infra: SharedInfra) -> Self {
32 Self { runner, infra }
33 }
34
35 pub fn run<'a>(
37 &'a self,
38 workflow: &'a Workflow,
39 ) -> futures::future::BoxFuture<'a, Result<WorkflowResult, WorkflowError>> {
40 Box::pin(self.run_inner(workflow))
41 }
42
43 async fn run_inner(&self, workflow: &Workflow) -> Result<WorkflowResult, WorkflowError> {
44 if workflow.stage(&workflow.start).is_none() {
46 return Err(WorkflowError::NoStartStage);
47 }
48
49 for stage in &workflow.stages {
53 if crate::verify::exceeds_nesting(&stage.step, crate::verify::MAX_STEP_NESTING_DEPTH) {
54 return Err(WorkflowError::StageFailed(
55 stage.id.clone(),
56 format!(
57 "loop/foreach/sub-workflow nesting exceeds the limit of {}",
58 crate::verify::MAX_STEP_NESTING_DEPTH
59 ),
60 ));
61 }
62 }
63
64 let mut wf_state = HashMap::new();
65 if let Some(goal) = &workflow.goal {
68 wf_state.insert("goal".to_string(), Value::String(goal.clone()));
69 }
70 let cursor = Cursor {
71 wf_state,
72 stage_results: Vec::new(),
73 completed_stage_ids: Vec::new(),
74 iterations: 0,
75 prior_duration_ms: 0.0,
76 current_id: workflow.start.clone(),
77 };
78 self.drive(workflow, new_run_id(), cursor, &HashMap::new())
79 .await
80 }
81
82 pub async fn run_cached(
98 &self,
99 workflow: &Workflow,
100 prior: &WorkflowResult,
101 ) -> Result<WorkflowResult, WorkflowError> {
102 if workflow.stage(&workflow.start).is_none() {
103 return Err(WorkflowError::NoStartStage);
104 }
105
106 let cache: HashMap<String, StageResult> = prior
108 .stages
109 .iter()
110 .filter(|s| s.status == StageStatus::Succeeded)
111 .map(|s| (s.stage_id.clone(), s.clone()))
112 .collect();
113
114 let mut wf_state = prior.final_state.clone();
117
118 let stage_ids: Vec<String> = workflow.stages.iter().map(|s| s.id.clone()).collect();
124 wf_state.retain(|k, _| {
125 match k.strip_prefix("stage.") {
126 Some(rest) => {
128 let id = rest.split('.').next().unwrap_or("");
129 cache.contains_key(id) || !stage_ids.iter().any(|s| s == id)
130 }
131 None => true,
132 }
133 });
134
135 if let Some(goal) = &workflow.goal {
138 wf_state.insert("goal".to_string(), Value::String(goal.clone()));
139 } else {
140 wf_state.remove("goal");
141 }
142
143 let cursor = Cursor {
144 wf_state,
145 stage_results: Vec::new(),
146 completed_stage_ids: Vec::new(),
147 iterations: 0,
148 prior_duration_ms: 0.0,
149 current_id: workflow.start.clone(),
150 };
151 self.drive(workflow, new_run_id(), cursor, &cache).await
152 }
153
154 pub async fn resume(
161 &self,
162 paused: PausedWorkflow,
163 input: HashMap<String, Value>,
164 ) -> Result<WorkflowResult, WorkflowError> {
165 let PausedWorkflow {
166 run_id,
167 workflow,
168 paused_stage_id,
169 mut wf_state,
170 mut stage_results,
171 mut completed_stage_ids,
172 iterations,
173 prior_duration_ms,
174 ..
175 } = paused;
176
177 let approval = match workflow.stage(&paused_stage_id).map(|s| &s.step) {
183 Some(StageStep::Approval(ap)) => ap.clone(),
184 Some(_) => {
185 return Err(WorkflowError::InvalidResume(format!(
186 "stage '{paused_stage_id}' is not an approval gate"
187 )))
188 }
189 None => {
190 return Err(WorkflowError::InvalidResume(format!(
191 "paused stage '{paused_stage_id}' not found in workflow"
192 )))
193 }
194 };
195 let output_key = approval.output_key.clone();
196 if output_key.trim().is_empty() {
197 return Err(WorkflowError::InvalidResume(format!(
198 "approval stage '{paused_stage_id}' has empty output_key"
199 )));
200 }
201 if output_key == "goal" {
202 return Err(WorkflowError::InvalidResume(
203 "approval output_key 'goal' is reserved (it is the drift anchor)".into(),
204 ));
205 }
206
207 if let Some(goal) = &workflow.goal {
211 wf_state.insert("goal".to_string(), Value::String(goal.clone()));
212 } else {
213 wf_state.remove("goal");
214 }
215
216 validate_approval_input(&approval.fields, &input)?;
220
221 let response = Value::Object(input.into_iter().collect());
228 if let Value::Object(map) = &response {
229 for (k, v) in map {
230 wf_state.insert(format!("{output_key}.{k}"), v.clone());
231 }
232 }
233 wf_state.insert(output_key.clone(), response.clone());
234 wf_state.insert(
235 format!("stage.{paused_stage_id}.succeeded"),
236 Value::Bool(true),
237 );
238 wf_state.insert(
239 format!("stage.{paused_stage_id}.answer"),
240 Value::String(response.to_string()),
241 );
242
243 let stage_name = workflow
244 .stage(&paused_stage_id)
245 .map(|s| s.name.clone())
246 .unwrap_or_else(|| paused_stage_id.clone());
247 stage_results.push(StageResult {
248 stage_id: paused_stage_id.clone(),
249 stage_name,
250 status: StageStatus::Succeeded,
251 output: StageOutput::Approval { response },
252 duration_ms: 0.0,
253 error: None,
254 });
255 completed_stage_ids.push(paused_stage_id.clone());
256
257 info!(stage = %paused_stage_id, run_id = %run_id, "approval resumed");
258
259 match next_stage(&workflow, &paused_stage_id, &wf_state) {
261 Some(next_id) => {
262 let cursor = Cursor {
263 wf_state,
264 stage_results,
265 completed_stage_ids,
266 iterations,
267 prior_duration_ms,
268 current_id: next_id,
269 };
270 self.drive(&workflow, run_id, cursor, &HashMap::new()).await
271 }
272 None => {
273 info!(workflow = %workflow.name, "workflow completed (terminal approval gate)");
275 Ok(completed_result(
276 &workflow,
277 stage_results,
278 wf_state,
279 prior_duration_ms,
280 ))
281 }
282 }
283 }
284
285 async fn drive(
293 &self,
294 workflow: &Workflow,
295 run_id: String,
296 mut cur: Cursor,
297 cache: &HashMap<String, StageResult>,
298 ) -> Result<WorkflowResult, WorkflowError> {
299 let start = Instant::now();
300
301 loop {
302 cur.iterations += 1;
303 if cur.iterations > workflow.max_iterations {
304 return Err(WorkflowError::CycleLimitReached(workflow.max_iterations));
305 }
306
307 let stage = workflow
308 .stage(&cur.current_id)
309 .ok_or_else(|| WorkflowError::StageNotFound(cur.current_id.clone()))?;
310
311 if let Some(cached) = cache.get(&cur.current_id) {
317 debug!(stage = %cur.current_id, "replaying cached stage");
318 cur.wf_state
319 .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
320 cur.stage_results.push(cached.clone());
321 cur.completed_stage_ids.push(stage.id.clone());
322 match next_stage(workflow, &cur.current_id, &cur.wf_state) {
323 Some(next_id) => {
324 cur.current_id = next_id;
325 continue;
326 }
327 None => {
328 let duration =
329 cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
330 return Ok(completed_result(
331 workflow,
332 cur.stage_results,
333 cur.wf_state,
334 duration,
335 ));
336 }
337 }
338 }
339
340 if let StageStep::Approval(ap) = &stage.step {
343 if ap.output_key.trim().is_empty() {
346 return Err(WorkflowError::StageFailed(
347 stage.id.clone(),
348 "approval stage has empty output_key".into(),
349 ));
350 }
351 info!(stage = %stage.id, run_id = %run_id, "workflow paused at approval gate");
352 let elapsed = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
355 let now = chrono::Utc::now();
356 let paused = PausedWorkflow {
357 run_id: run_id.clone(),
358 workflow: workflow.clone(),
359 paused_stage_id: stage.id.clone(),
360 prompt: ap.prompt.clone(),
361 fields: ap.fields.clone(),
362 output_key: ap.output_key.clone(),
363 wf_state: cur.wf_state.clone(),
364 stage_results: cur.stage_results.clone(),
365 completed_stage_ids: cur.completed_stage_ids.clone(),
366 iterations: cur.iterations,
367 prior_duration_ms: elapsed,
368 created_at: now,
369 };
370 return Ok(WorkflowResult {
371 workflow_id: workflow.id.clone(),
372 workflow_name: workflow.name.clone(),
373 status: WorkflowStatus::Paused,
374 stages: cur.stage_results,
375 compensations: vec![],
376 duration_ms: elapsed,
377 timestamp: now,
378 final_state: cur.wf_state,
379 paused: Some(paused),
380 });
381 }
382
383 debug!(stage = %stage.id, name = %stage.name, iteration = cur.iterations, "executing stage");
384
385 let stage_start = Instant::now();
387 let result = if let Some(timeout_ms) = stage.timeout_ms {
388 match tokio::time::timeout(
389 std::time::Duration::from_millis(timeout_ms),
390 self.execute_step(&stage.id, &stage.step, &cur.wf_state),
391 )
392 .await
393 {
394 Ok(r) => r,
395 Err(_) => Err(WorkflowError::Timeout(stage.id.clone(), timeout_ms)),
396 }
397 } else {
398 self.execute_step(&stage.id, &stage.step, &cur.wf_state).await
399 };
400 let stage_duration = stage_start.elapsed().as_secs_f64() * 1000.0;
401
402 match result {
403 Ok((output, answer, deltas)) => {
404 for (k, v) in deltas {
409 cur.wf_state.insert(k, v);
410 }
411 if let Some(goal) = &workflow.goal {
415 cur.wf_state
416 .insert("goal".to_string(), Value::String(goal.clone()));
417 }
418 cur.wf_state
419 .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(true));
420 cur.wf_state
421 .insert(format!("stage.{}.answer", stage.id), Value::String(answer));
422
423 cur.stage_results.push(StageResult {
424 stage_id: stage.id.clone(),
425 stage_name: stage.name.clone(),
426 status: StageStatus::Succeeded,
427 output,
428 duration_ms: stage_duration,
429 error: None,
430 });
431 cur.completed_stage_ids.push(stage.id.clone());
432
433 info!(stage = %stage.id, duration_ms = stage_duration, "stage succeeded");
434 }
435 Err(e) => {
436 let error_msg = e.to_string();
437 cur.wf_state
438 .insert(format!("stage.{}.succeeded", stage.id), Value::Bool(false));
439 cur.wf_state.insert(
440 format!("stage.{}.error", stage.id),
441 Value::String(error_msg.clone()),
442 );
443
444 cur.stage_results.push(StageResult {
445 stage_id: stage.id.clone(),
446 stage_name: stage.name.clone(),
447 status: StageStatus::Failed,
448 output: StageOutput::Empty,
449 duration_ms: stage_duration,
450 error: Some(error_msg.clone()),
451 });
452
453 warn!(stage = %stage.id, error = %error_msg, "stage failed, running compensation");
454
455 let compensations = self.compensate(workflow, &cur.completed_stage_ids).await;
457
458 let all_compensated = compensations
459 .iter()
460 .all(|c| c.status == StageStatus::Succeeded);
461 let any_compensated = !compensations.is_empty();
462
463 let status = if any_compensated && all_compensated {
464 WorkflowStatus::Compensated
465 } else if any_compensated {
466 WorkflowStatus::PartiallyCompensated
467 } else {
468 WorkflowStatus::Failed
469 };
470
471 return Ok(WorkflowResult {
472 workflow_id: workflow.id.clone(),
473 workflow_name: workflow.name.clone(),
474 status,
475 stages: cur.stage_results,
476 compensations,
477 duration_ms: cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0,
478 timestamp: chrono::Utc::now(),
479 final_state: cur.wf_state,
480 paused: None,
481 });
482 }
483 }
484
485 match next_stage(workflow, &cur.current_id, &cur.wf_state) {
487 Some(next_id) => {
488 debug!(from = %cur.current_id, to = %next_id, "taking edge");
489 cur.current_id = next_id;
490 }
491 None => {
492 info!(
494 workflow = %workflow.name,
495 stages_executed = cur.stage_results.len(),
496 "workflow completed"
497 );
498 let duration = cur.prior_duration_ms + start.elapsed().as_secs_f64() * 1000.0;
499 return Ok(completed_result(
500 workflow,
501 cur.stage_results,
502 cur.wf_state,
503 duration,
504 ));
505 }
506 }
507 }
508 }
509
510 fn execute_step<'a>(
515 &'a self,
516 stage_id: &'a str,
517 step: &'a StageStep,
518 wf_state: &'a HashMap<String, Value>,
519 ) -> futures::future::BoxFuture<'a, Result<StepOutcome, WorkflowError>> {
520 Box::pin(async move {
521 match step {
522 StageStep::Pattern(ps) => {
523 let (out, answer) = self.execute_pattern(ps, wf_state).await?;
524 let mut deltas = HashMap::new();
527 if let StageOutput::Review {
528 passed,
529 blocker_count,
530 ..
531 } = &out
532 {
533 deltas.insert(
534 format!("stage.{stage_id}.review_passed"),
535 Value::Bool(*passed),
536 );
537 deltas.insert(
538 format!("stage.{stage_id}.review_blockers"),
539 Value::from(*blocker_count),
540 );
541 }
542 Ok((out, answer, deltas))
543 }
544 StageStep::Proposal(ps) => {
545 let (out, answer) = self.execute_proposal(ps).await?;
546 let mut deltas = HashMap::new();
549 if let StageOutput::Proposal { ref result } = out {
550 for ar in &result.results {
551 for (k, v) in &ar.state_changes {
552 deltas.insert(k.clone(), v.clone());
553 }
554 }
555 }
556 Ok((out, answer, deltas))
557 }
558 StageStep::SubWorkflow(sw) => {
559 let (out, answer) = self.execute_sub_workflow(sw).await?;
560 Ok((out, answer, HashMap::new()))
561 }
562 StageStep::LoopUntil(ls) => self.execute_loop_until(stage_id, ls, wf_state).await,
563 StageStep::ForEach(fe) => self.execute_for_each(stage_id, fe, wf_state).await,
564 StageStep::Approval(_) => Err(WorkflowError::StageFailed(
569 "approval".into(),
570 "an approval gate cannot be executed as a step, loop/foreach body, or compensation".into(),
571 )),
572 }
573 })
574 }
575
576 async fn execute_loop_until(
580 &self,
581 stage_id: &str,
582 ls: &LoopUntilStep,
583 wf_state: &HashMap<String, Value>,
584 ) -> Result<StepOutcome, WorkflowError> {
585 if ls.max_iterations == 0 {
586 return Err(WorkflowError::StageFailed(
587 stage_id.to_string(),
588 "loop_until requires max_iterations >= 1".into(),
589 ));
590 }
591
592 let body_id = format!("{stage_id}.body");
593 let mut working = wf_state.clone();
594 let mut accumulated: HashMap<String, Value> = HashMap::new();
595 let mut outputs: Vec<Box<StageOutput>> = Vec::new();
596 let mut last_answer = String::new();
597 let mut satisfied = false;
598 let mut ran: u32 = 0;
599
600 for _ in 0..ls.max_iterations {
601 let (out, answer, deltas) = self
602 .execute_step(&body_id, &ls.body, &working)
603 .await?;
604 ran += 1;
605
606 for (k, v) in deltas {
609 working.insert(k.clone(), v.clone());
610 accumulated.insert(k, v);
611 }
612 let answer_val = Value::String(answer.clone());
613 let iter_val = Value::from(ran);
614 working.insert(format!("stage.{stage_id}.answer"), answer_val.clone());
615 working.insert(format!("stage.{stage_id}.iteration"), iter_val.clone());
616 accumulated.insert(format!("stage.{stage_id}.answer"), answer_val);
617 accumulated.insert(format!("stage.{stage_id}.iteration"), iter_val);
618
619 last_answer = answer;
620 outputs.push(Box::new(out));
621
622 if !ls.until.is_empty() && check_conditions(&ls.until, &working) {
625 satisfied = true;
626 break;
627 }
628 }
629
630 Ok((
631 StageOutput::Loop {
632 iterations: ran,
633 satisfied,
634 iterations_output: outputs,
635 },
636 last_answer,
637 accumulated,
638 ))
639 }
640
641 async fn execute_for_each(
646 &self,
647 stage_id: &str,
648 fe: &ForEachStep,
649 wf_state: &HashMap<String, Value>,
650 ) -> Result<StepOutcome, WorkflowError> {
651 let items: Vec<Value> = wf_state
653 .get(&fe.items_from)
654 .and_then(|v| v.as_array().cloned())
655 .unwrap_or_default();
656 let item_strs: Vec<String> = items.iter().map(render_item).collect();
657
658 let mut templated: Vec<StageStep> = Vec::with_capacity(items.len());
661 for (i, item) in item_strs.iter().enumerate() {
662 templated.push(template_step(&fe.body, item, i)?);
663 }
664
665 let concurrency = fe.max_concurrent.max(1);
666
667 let mut futs = Vec::with_capacity(templated.len());
670 for (i, body) in templated.iter().enumerate() {
671 futs.push(self.run_foreach_body(stage_id, i, body, wf_state));
672 }
673
674 let results: Vec<Result<StepOutcome, WorkflowError>> = if concurrency <= 1 {
676 let mut acc = Vec::with_capacity(futs.len());
677 for f in futs {
678 acc.push(f.await);
679 }
680 acc
681 } else {
682 use futures::stream::StreamExt;
683 futures::stream::iter(futs)
684 .buffered(concurrency)
685 .collect()
686 .await
687 };
688
689 let mut outputs: Vec<Box<StageOutput>> = Vec::with_capacity(results.len());
690 let mut deltas: HashMap<String, Value> = HashMap::new();
691 for (i, r) in results.into_iter().enumerate() {
692 let (out, answer, body_deltas) = r?;
693 deltas.insert(
694 format!("foreach.{stage_id}.{i}.item"),
695 Value::String(item_strs[i].clone()),
696 );
697 deltas.insert(
698 format!("foreach.{stage_id}.{i}.answer"),
699 Value::String(answer),
700 );
701 for (k, v) in body_deltas {
704 deltas.insert(format!("foreach.{stage_id}.{i}.state.{k}"), v);
705 }
706 outputs.push(Box::new(out));
707 }
708 deltas.insert(
709 format!("foreach.{stage_id}.count"),
710 Value::from(item_strs.len()),
711 );
712
713 let answer = format!("{} item(s) processed", item_strs.len());
714 Ok((
715 StageOutput::ForEach {
716 items: item_strs,
717 outputs,
718 },
719 answer,
720 deltas,
721 ))
722 }
723
724 async fn execute_pattern(
726 &self,
727 step: &PatternStep,
728 wf_state: &HashMap<String, Value>,
729 ) -> Result<(StageOutput, String), WorkflowError> {
730 let anchored_task = match wf_state.get("goal").and_then(|g| g.as_str()) {
732 Some(goal) if !goal.trim().is_empty() => {
733 format!("Overall goal: {goal}\n\nCurrent step: {}", step.task)
734 }
735 _ => step.task.clone(),
736 };
737 let task = anchored_task.as_str();
738 let runner = &self.runner;
739 let infra = &self.infra;
740
741 match step.pattern {
742 PatternKind::SwarmParallel => {
743 let mut swarm = Swarm::new(step.agents.clone(), SwarmMode::Parallel);
744 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
745 swarm = swarm.with_synthesizer(synth);
746 }
747 let r = swarm.run(task, runner, infra).await?;
748 Ok((
749 StageOutput::Pattern {
750 outputs: r.outputs,
751 final_answer: r.final_summary.clone(),
752 },
753 r.final_summary,
754 ))
755 }
756 PatternKind::SwarmSequential => {
757 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Sequential);
758 let r = swarm.run(task, runner, infra).await?;
759 Ok((
760 StageOutput::Pattern {
761 outputs: r.outputs,
762 final_answer: r.final_summary.clone(),
763 },
764 r.final_summary,
765 ))
766 }
767 PatternKind::SwarmDebate => {
768 let swarm = Swarm::new(step.agents.clone(), SwarmMode::Debate);
769 let r = swarm.run(task, runner, infra).await?;
770 Ok((
771 StageOutput::Pattern {
772 outputs: r.outputs,
773 final_answer: r.final_summary.clone(),
774 },
775 r.final_summary,
776 ))
777 }
778 PatternKind::Pipeline => {
779 let pipeline = Pipeline::new(step.agents.clone());
780 let r = pipeline.run(task, runner, infra).await?;
781 Ok((
782 StageOutput::Pattern {
783 outputs: r.stages,
784 final_answer: r.final_answer.clone(),
785 },
786 r.final_answer,
787 ))
788 }
789 PatternKind::Supervisor => {
790 let max_rounds = step
791 .config
792 .get("max_rounds")
793 .and_then(|v| v.as_u64())
794 .unwrap_or(3) as u32;
795 let (supervisor, workers) = split_supervisor_workers(&step.agents, &step.config);
796 let r = Supervisor::new(workers, supervisor)
797 .with_max_rounds(max_rounds)
798 .run(task, runner, infra)
799 .await?;
800 let all_outputs: Vec<_> = r.rounds.into_iter().flatten().collect();
801 Ok((
802 StageOutput::Pattern {
803 outputs: all_outputs,
804 final_answer: r.final_answer.clone(),
805 },
806 r.final_answer,
807 ))
808 }
809 PatternKind::Delegator => {
810 let (main_agent, specialists) = split_delegator(&step.agents, &step.config);
811 let delegator = car_multi::Delegator::new(main_agent, specialists);
812 let r = delegator.run(task, runner, infra).await?;
813 Ok((
814 StageOutput::Pattern {
815 outputs: vec![car_multi::AgentOutput {
816 name: "delegator".into(),
817 answer: r.final_answer.clone(),
818 turns: 0,
819 tool_calls: r.delegations.len() as u32,
820 duration_ms: 0.0,
821 error: None,
822 outcome: None,
823 tokens: None,
824 }],
825 final_answer: r.final_answer.clone(),
826 },
827 r.final_answer,
828 ))
829 }
830 PatternKind::MapReduce => {
831 let max_concurrent = step
832 .config
833 .get("max_concurrent")
834 .and_then(|v| v.as_u64())
835 .unwrap_or(5) as usize;
836 let items: Vec<String> = step
837 .config
838 .get("items")
839 .and_then(|v| serde_json::from_value(v.clone()).ok())
840 .unwrap_or_default();
841
842 if step.agents.len() < 2 {
843 return Err(WorkflowError::StageFailed(
844 "map_reduce".into(),
845 "requires at least 2 agents (mapper + reducer)".into(),
846 ));
847 }
848 let mapper = step.agents[0].clone();
849 let reducer = step.agents[1].clone();
850
851 let mr = MapReduce::new(mapper, reducer).with_max_concurrent(max_concurrent);
852 let r = mr
853 .run(
854 task,
855 &items
856 .iter()
857 .map(|s| s.as_str())
858 .collect::<Vec<_>>()
859 .iter()
860 .map(|s| s.to_string())
861 .collect::<Vec<_>>(),
862 runner,
863 infra,
864 )
865 .await?;
866 Ok((
867 StageOutput::Pattern {
868 outputs: r.map_outputs,
869 final_answer: r.reduced_answer.clone(),
870 },
871 r.reduced_answer,
872 ))
873 }
874 PatternKind::Vote => {
875 let mut vote = Vote::new(step.agents.clone());
876 if let Some(synth) = extract_synthesizer(&step.config, &step.agents) {
877 vote = vote.with_synthesizer(synth);
878 }
879 let r = vote.run(task, runner, infra).await?;
880 Ok((
881 StageOutput::Pattern {
882 outputs: r.votes,
883 final_answer: r.winner.clone(),
884 },
885 r.winner,
886 ))
887 }
888 PatternKind::Fleet => {
889 let mut fleet = Fleet::new(step.agents.clone());
890 if let Some(timeout) = step.config.get("timeout_secs").and_then(|v| v.as_u64()) {
891 fleet = fleet.with_timeout(timeout);
892 }
893 let r = fleet.run(runner, infra).await?;
894 let summary = format!("{} succeeded, {} failed", r.succeeded, r.failed);
895 Ok((
896 StageOutput::Pattern {
897 outputs: r.outputs,
898 final_answer: summary.clone(),
899 },
900 summary,
901 ))
902 }
903 PatternKind::AdversarialReview => {
904 if step.agents.is_empty() {
905 return Err(WorkflowError::StageFailed(
906 "adversarial_review".into(),
907 "requires a reviewer agent (agents[0])".into(),
908 ));
909 }
910 let reviewer = step.agents[0].clone();
911 let criteria: Vec<String> = step
912 .config
913 .get("criteria")
914 .and_then(|v| serde_json::from_value(v.clone()).ok())
915 .unwrap_or_default();
916 let work = step
918 .config
919 .get("review_key")
920 .and_then(|v| v.as_str())
921 .and_then(|key| wf_state.get(key))
922 .map(|v| match v {
923 Value::String(s) => s.clone(),
924 other => other.to_string(),
925 })
926 .unwrap_or_default();
927
928 if work.trim().is_empty() {
932 let answer = "FAIL (no work to review — missing or empty review_key)".to_string();
933 return Ok((
934 StageOutput::Review {
935 passed: false,
936 blocker_count: 1,
937 findings: vec![],
938 reviewer: car_multi::AgentOutput {
939 name: "reviewer".into(),
940 answer: answer.clone(),
941 turns: 0,
942 tool_calls: 0,
943 duration_ms: 0.0,
944 error: None,
945 outcome: None,
946 tokens: None,
947 },
948 },
949 answer,
950 ));
951 }
952
953 let r = AdversarialReview::new(reviewer, criteria)
954 .run(&work, runner, infra)
955 .await?;
956 let answer = if r.passed {
959 format!("PASS ({} finding(s))", r.findings.len())
960 } else {
961 format!("FAIL ({} blocker(s))", r.blocker_count)
962 };
963 Ok((
964 StageOutput::Review {
965 passed: r.passed,
966 blocker_count: r.blocker_count,
967 findings: r.findings,
968 reviewer: r.reviewer_output,
969 },
970 answer,
971 ))
972 }
973 PatternKind::Tournament => {
974 if step.agents.len() < 3 {
975 return Err(WorkflowError::StageFailed(
976 "tournament".into(),
977 "requires at least 3 agents (>=2 competitors + 1 judge)".into(),
978 ));
979 }
980 let judge_idx = step
981 .config
982 .get("judge_index")
983 .and_then(|v| v.as_u64())
984 .unwrap_or(step.agents.len() as u64 - 1) as usize;
985 let judge = step
986 .agents
987 .get(judge_idx)
988 .cloned()
989 .unwrap_or_else(|| step.agents.last().unwrap().clone());
990 let competitors: Vec<_> = step
991 .agents
992 .iter()
993 .enumerate()
994 .filter(|(i, _)| *i != judge_idx)
995 .map(|(_, a)| a.clone())
996 .collect();
997
998 let r = Tournament::new(competitors, judge)
999 .run(task, runner, infra)
1000 .await?;
1001 Ok((
1002 StageOutput::Pattern {
1003 outputs: r.candidates,
1004 final_answer: r.winner_answer.clone(),
1005 },
1006 r.winner_answer,
1007 ))
1008 }
1009 }
1010 }
1011
1012 async fn run_foreach_body(
1018 &self,
1019 stage_id: &str,
1020 index: usize,
1021 body: &StageStep,
1022 wf_state: &HashMap<String, Value>,
1023 ) -> Result<StepOutcome, WorkflowError> {
1024 let child_id = format!("{stage_id}.{index}");
1025 self.execute_step(&child_id, body, wf_state).await
1026 }
1027
1028 async fn execute_proposal(
1030 &self,
1031 step: &ProposalStep,
1032 ) -> Result<(StageOutput, String), WorkflowError> {
1033 let runtime = self.infra.make_runtime();
1034 let result = runtime.execute(&step.proposal).await;
1035
1036 if result.all_succeeded() {
1037 let answer = result
1038 .results
1039 .last()
1040 .and_then(|r| r.output.as_ref())
1041 .map(|v| v.to_string())
1042 .unwrap_or_default();
1043 Ok((StageOutput::Proposal { result }, answer))
1044 } else {
1045 let errors: Vec<String> = result
1046 .results
1047 .iter()
1048 .filter_map(|r| r.error.as_ref())
1049 .cloned()
1050 .collect();
1051 Err(WorkflowError::StageFailed(
1052 "proposal".into(),
1053 errors.join("; "),
1054 ))
1055 }
1056 }
1057
1058 async fn execute_sub_workflow(
1060 &self,
1061 step: &SubWorkflowStep,
1062 ) -> Result<(StageOutput, String), WorkflowError> {
1063 let result = self.run(&step.workflow).await?;
1064
1065 if result.is_paused() {
1074 return Err(WorkflowError::ApprovalInSubWorkflow(
1075 step.workflow.id.clone(),
1076 ));
1077 }
1078
1079 let answer = result
1080 .stages
1081 .last()
1082 .and_then(|s| match &s.output {
1083 StageOutput::Pattern { final_answer, .. } => Some(final_answer.clone()),
1084 StageOutput::Proposal { result } => result
1085 .results
1086 .last()
1087 .and_then(|r| r.output.as_ref())
1088 .map(|v| v.to_string()),
1089 StageOutput::SubWorkflow { result } => Some(format!(
1090 "sub-workflow {} {}",
1091 result.workflow_name,
1092 if result.succeeded() {
1093 "completed"
1094 } else {
1095 "failed"
1096 }
1097 )),
1098 StageOutput::Approval { response } => Some(response.to_string()),
1099 StageOutput::Review {
1100 passed,
1101 blocker_count,
1102 ..
1103 } => Some(format!(
1104 "review {}",
1105 if *passed {
1106 "passed".to_string()
1107 } else {
1108 format!("failed ({blocker_count} blocker(s))")
1109 }
1110 )),
1111 StageOutput::Loop {
1112 iterations,
1113 satisfied,
1114 ..
1115 } => Some(format!(
1116 "loop ran {} iteration(s), until {}satisfied",
1117 iterations,
1118 if *satisfied { "" } else { "not " }
1119 )),
1120 StageOutput::ForEach { items, .. } => {
1121 Some(format!("foreach over {} item(s)", items.len()))
1122 }
1123 StageOutput::Empty => None,
1124 })
1125 .unwrap_or_default();
1126
1127 if result.succeeded() {
1128 Ok((
1129 StageOutput::SubWorkflow {
1130 result: Box::new(result),
1131 },
1132 answer,
1133 ))
1134 } else {
1135 Err(WorkflowError::StageFailed(
1136 "sub_workflow".into(),
1137 "sub-workflow failed".into(),
1138 ))
1139 }
1140 }
1141
1142 async fn compensate(
1144 &self,
1145 workflow: &Workflow,
1146 completed_stage_ids: &[String],
1147 ) -> Vec<CompensationResult> {
1148 let mut results = Vec::new();
1149
1150 for stage_id in completed_stage_ids.iter().rev() {
1151 let stage = match workflow.stage(stage_id) {
1152 Some(s) => s,
1153 None => continue,
1154 };
1155
1156 let handler = match &stage.compensation {
1157 Some(h) => h,
1158 None => continue,
1159 };
1160
1161 debug!(stage = %stage_id, "running compensation");
1162 let comp_start = Instant::now();
1163
1164 let comp_result = match handler {
1165 CompensationHandler::Proposal(ps) => self.execute_proposal(ps).await,
1166 CompensationHandler::StageRef { stage_id: ref_id } => {
1167 if let Some(ref_stage) = workflow.stage(ref_id) {
1168 self.execute_step(ref_id, &ref_stage.step, &HashMap::new())
1169 .await
1170 .map(|(out, answer, _deltas)| (out, answer))
1171 } else {
1172 Err(WorkflowError::StageNotFound(ref_id.clone()))
1173 }
1174 }
1175 };
1176
1177 let duration = comp_start.elapsed().as_secs_f64() * 1000.0;
1178
1179 match comp_result {
1180 Ok(_) => {
1181 results.push(CompensationResult {
1182 for_stage_id: stage_id.clone(),
1183 status: StageStatus::Succeeded,
1184 duration_ms: duration,
1185 error: None,
1186 });
1187 }
1188 Err(e) => {
1189 warn!(stage = %stage_id, error = %e, "compensation failed");
1190 results.push(CompensationResult {
1191 for_stage_id: stage_id.clone(),
1192 status: StageStatus::Failed,
1193 duration_ms: duration,
1194 error: Some(e.to_string()),
1195 });
1196 }
1197 }
1198 }
1199
1200 results
1201 }
1202}
1203
1204struct Cursor {
1207 wf_state: HashMap<String, Value>,
1208 stage_results: Vec<StageResult>,
1209 completed_stage_ids: Vec<String>,
1210 iterations: u32,
1211 prior_duration_ms: f64,
1215 current_id: String,
1216}
1217
1218fn new_run_id() -> String {
1221 uuid::Uuid::new_v4().simple().to_string()
1222}
1223
1224fn validate_approval_input(
1226 fields: &[ApprovalField],
1227 input: &HashMap<String, Value>,
1228) -> Result<(), WorkflowError> {
1229 for field in fields {
1230 match input.get(&field.name) {
1231 None | Some(Value::Null) => {
1232 if field.required {
1233 return Err(WorkflowError::InvalidApprovalInput(format!(
1234 "required field '{}' is missing",
1235 field.name
1236 )));
1237 }
1238 }
1239 Some(value) => {
1240 if field.field_type == "options" && !field.options.is_empty() {
1241 let ok = value
1242 .as_str()
1243 .map(|s| field.options.iter().any(|o| o == s))
1244 .unwrap_or(false);
1245 if !ok {
1246 return Err(WorkflowError::InvalidApprovalInput(format!(
1247 "field '{}' value {} is not one of {:?}",
1248 field.name, value, field.options
1249 )));
1250 }
1251 }
1252 }
1253 }
1254 }
1255 Ok(())
1256}
1257
1258fn next_stage(workflow: &Workflow, from: &str, state: &HashMap<String, Value>) -> Option<String> {
1261 workflow
1262 .outgoing_edges(from)
1263 .iter()
1264 .find(|e| check_conditions(&e.conditions, state))
1265 .map(|e| e.to.clone())
1266}
1267
1268fn completed_result(
1270 workflow: &Workflow,
1271 stage_results: Vec<StageResult>,
1272 final_state: HashMap<String, Value>,
1273 duration_ms: f64,
1274) -> WorkflowResult {
1275 WorkflowResult {
1276 workflow_id: workflow.id.clone(),
1277 workflow_name: workflow.name.clone(),
1278 status: WorkflowStatus::Completed,
1279 stages: stage_results,
1280 compensations: vec![],
1281 duration_ms,
1282 timestamp: chrono::Utc::now(),
1283 final_state,
1284 paused: None,
1285 }
1286}
1287
1288fn render_item(item: &Value) -> String {
1293 match item {
1294 Value::String(s) => s.clone(),
1295 other => other.to_string(),
1296 }
1297}
1298
1299fn template_step(body: &StageStep, item: &str, index: usize) -> Result<StageStep, WorkflowError> {
1303 let mut v = serde_json::to_value(body)
1304 .map_err(|e| WorkflowError::StageFailed("foreach".into(), format!("serialize body: {e}")))?;
1305 substitute_in_value(&mut v, item, index);
1306 serde_json::from_value(v).map_err(|e| {
1307 WorkflowError::StageFailed("foreach".into(), format!("rebuild templated body: {e}"))
1308 })
1309}
1310
1311fn substitute_in_value(v: &mut Value, item: &str, index: usize) {
1312 match v {
1313 Value::String(s) => {
1314 if s.contains("{{item}}") || s.contains("{{index}}") {
1315 *s = s
1316 .replace("{{item}}", item)
1317 .replace("{{index}}", &index.to_string());
1318 }
1319 }
1320 Value::Array(a) => a.iter_mut().for_each(|e| substitute_in_value(e, item, index)),
1321 Value::Object(m) => m
1322 .values_mut()
1323 .for_each(|e| substitute_in_value(e, item, index)),
1324 _ => {}
1325 }
1326}
1327
1328fn check_conditions(conditions: &[car_ir::Precondition], state: &HashMap<String, Value>) -> bool {
1332 conditions
1333 .iter()
1334 .all(|cond| evaluate_precondition(cond, state))
1335}
1336
1337fn evaluate_precondition(cond: &car_ir::Precondition, state: &HashMap<String, Value>) -> bool {
1339 let op = cond.operator.as_str();
1340
1341 match op {
1342 "exists" => state.contains_key(&cond.key),
1343 "not_exists" => !state.contains_key(&cond.key),
1344 _ => {
1345 let actual = match state.get(&cond.key) {
1346 Some(v) => v,
1347 None => return false, };
1349 match op {
1350 "eq" => actual == &cond.value,
1351 "neq" => actual != &cond.value,
1352 "gt" => compare_values(actual, &cond.value)
1353 .map_or(false, |o| o == std::cmp::Ordering::Greater),
1354 "gte" => compare_values(actual, &cond.value)
1355 .map_or(false, |o| o != std::cmp::Ordering::Less),
1356 "lt" => compare_values(actual, &cond.value)
1357 .map_or(false, |o| o == std::cmp::Ordering::Less),
1358 "lte" => compare_values(actual, &cond.value)
1359 .map_or(false, |o| o != std::cmp::Ordering::Greater),
1360 "contains" => {
1361 if let (Some(haystack), Some(needle)) = (actual.as_str(), cond.value.as_str()) {
1362 haystack.contains(needle)
1363 } else {
1364 false
1365 }
1366 }
1367 _ => false,
1368 }
1369 }
1370 }
1371}
1372
1373fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
1374 match (a.as_f64(), b.as_f64()) {
1375 (Some(a), Some(b)) => a.partial_cmp(&b),
1376 _ => match (a.as_str(), b.as_str()) {
1377 (Some(a), Some(b)) => Some(a.cmp(b)),
1378 _ => None,
1379 },
1380 }
1381}
1382
1383fn extract_synthesizer(
1387 config: &HashMap<String, Value>,
1388 agents: &[car_multi::AgentSpec],
1389) -> Option<car_multi::AgentSpec> {
1390 config
1391 .get("synthesizer_index")
1392 .and_then(|v| v.as_u64())
1393 .and_then(|i| agents.get(i as usize))
1394 .cloned()
1395}
1396
1397fn split_supervisor_workers(
1399 agents: &[car_multi::AgentSpec],
1400 config: &HashMap<String, Value>,
1401) -> (car_multi::AgentSpec, Vec<car_multi::AgentSpec>) {
1402 let idx = config
1403 .get("supervisor_index")
1404 .and_then(|v| v.as_u64())
1405 .unwrap_or(agents.len().saturating_sub(1) as u64) as usize;
1406
1407 let supervisor = agents
1408 .get(idx)
1409 .cloned()
1410 .unwrap_or_else(|| agents.last().unwrap().clone());
1411 let workers: Vec<_> = agents
1412 .iter()
1413 .enumerate()
1414 .filter(|(i, _)| *i != idx)
1415 .map(|(_, a)| a.clone())
1416 .collect();
1417 (supervisor, workers)
1418}
1419
1420fn split_delegator(
1422 agents: &[car_multi::AgentSpec],
1423 _config: &HashMap<String, Value>,
1424) -> (car_multi::AgentSpec, HashMap<String, car_multi::AgentSpec>) {
1425 let main = agents
1426 .first()
1427 .cloned()
1428 .unwrap_or_else(|| car_multi::AgentSpec::new("main", ""));
1429 let specialists: HashMap<String, car_multi::AgentSpec> = agents
1430 .iter()
1431 .skip(1)
1432 .map(|a| (a.name.clone(), a.clone()))
1433 .collect();
1434 (main, specialists)
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439 use super::*;
1440
1441 #[test]
1442 fn precondition_eq() {
1443 let mut state = HashMap::new();
1444 state.insert("x".into(), Value::Bool(true));
1445
1446 let cond = car_ir::Precondition {
1447 key: "x".into(),
1448 operator: "eq".into(),
1449 value: Value::Bool(true),
1450 description: String::new(),
1451 };
1452 assert!(evaluate_precondition(&cond, &state));
1453
1454 let cond_false = car_ir::Precondition {
1455 key: "x".into(),
1456 operator: "eq".into(),
1457 value: Value::Bool(false),
1458 description: String::new(),
1459 };
1460 assert!(!evaluate_precondition(&cond_false, &state));
1461 }
1462
1463 #[test]
1464 fn precondition_exists() {
1465 let mut state = HashMap::new();
1466 state.insert("x".into(), Value::Null);
1467
1468 let exists = car_ir::Precondition {
1469 key: "x".into(),
1470 operator: "exists".into(),
1471 value: Value::Null,
1472 description: String::new(),
1473 };
1474 assert!(evaluate_precondition(&exists, &state));
1475
1476 let not_exists = car_ir::Precondition {
1477 key: "y".into(),
1478 operator: "exists".into(),
1479 value: Value::Null,
1480 description: String::new(),
1481 };
1482 assert!(!evaluate_precondition(¬_exists, &state));
1483 }
1484
1485 #[test]
1486 fn precondition_numeric_comparison() {
1487 let mut state = HashMap::new();
1488 state.insert("count".into(), serde_json::json!(5));
1489
1490 let gt = car_ir::Precondition {
1491 key: "count".into(),
1492 operator: "gt".into(),
1493 value: serde_json::json!(3),
1494 description: String::new(),
1495 };
1496 assert!(evaluate_precondition(>, &state));
1497
1498 let lt = car_ir::Precondition {
1499 key: "count".into(),
1500 operator: "lt".into(),
1501 value: serde_json::json!(3),
1502 description: String::new(),
1503 };
1504 assert!(!evaluate_precondition(<, &state));
1505 }
1506
1507 #[test]
1508 fn empty_conditions_always_pass() {
1509 let state = HashMap::new();
1510 assert!(check_conditions(&[], &state));
1511 }
1512
1513 use car_ir::ActionProposal;
1516
1517 struct NoopRunner;
1520
1521 #[async_trait::async_trait]
1522 impl car_multi::AgentRunner for NoopRunner {
1523 async fn run(
1524 &self,
1525 _spec: &car_multi::AgentSpec,
1526 _task: &str,
1527 _runtime: &car_engine::Runtime,
1528 _mailbox: &car_multi::Mailbox,
1529 ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
1530 Err(car_multi::MultiError::NoOutput)
1531 }
1532 }
1533
1534 fn test_engine() -> WorkflowEngine {
1535 WorkflowEngine::new(Arc::new(NoopRunner), car_multi::SharedInfra::new())
1536 }
1537
1538 fn approval_stage(id: &str, output_key: &str) -> Stage {
1539 Stage {
1540 id: id.into(),
1541 name: id.into(),
1542 step: StageStep::Approval(ApprovalStep {
1543 prompt: "approve?".into(),
1544 fields: vec![],
1545 output_key: output_key.into(),
1546 }),
1547 compensation: None,
1548 timeout_ms: None,
1549 metadata: HashMap::new(),
1550 }
1551 }
1552
1553 fn empty_proposal_stage(id: &str) -> Stage {
1554 Stage {
1555 id: id.into(),
1556 name: id.into(),
1557 step: StageStep::Proposal(ProposalStep {
1558 proposal: ActionProposal {
1559 id: format!("p-{id}"),
1560 source: "test".into(),
1561 actions: vec![],
1562 timestamp: chrono::Utc::now(),
1563 context: HashMap::new(),
1564 },
1565 }),
1566 compensation: None,
1567 timeout_ms: None,
1568 metadata: HashMap::new(),
1569 }
1570 }
1571
1572 fn edge(from: &str, to: &str, conditions: Vec<car_ir::Precondition>) -> Edge {
1573 Edge {
1574 from: from.into(),
1575 to: to.into(),
1576 conditions,
1577 label: String::new(),
1578 }
1579 }
1580
1581 #[tokio::test]
1582 async fn pauses_at_approval_gate_without_executing_it() {
1583 let wf = Workflow {
1584 id: "wf".into(),
1585 name: "WF".into(),
1586 start: "gate".into(),
1587 goal: None,
1588 stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
1589 edges: vec![edge("gate", "done", vec![])],
1590 max_iterations: 100,
1591 metadata: HashMap::new(),
1592 };
1593
1594 let res = test_engine().run(&wf).await.unwrap();
1595
1596 assert_eq!(res.status, WorkflowStatus::Paused);
1597 assert!(res.is_paused());
1598 assert!(res.stages.is_empty(), "gate body must not run before resume");
1599
1600 let paused = res.paused.expect("checkpoint present when paused");
1601 assert_eq!(paused.paused_stage_id, "gate");
1602 assert_eq!(paused.output_key, "approval");
1603 assert_eq!(paused.prompt, "approve?");
1604 assert!(!paused.run_id.is_empty());
1605 }
1606
1607 #[tokio::test]
1608 async fn resume_records_response_and_completes() {
1609 let wf = Workflow {
1610 id: "wf".into(),
1611 name: "WF".into(),
1612 start: "gate".into(),
1613 goal: None,
1614 stages: vec![approval_stage("gate", "approval"), empty_proposal_stage("done")],
1615 edges: vec![edge("gate", "done", vec![])],
1616 max_iterations: 100,
1617 metadata: HashMap::new(),
1618 };
1619 let eng = test_engine();
1620 let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1621
1622 let mut input = HashMap::new();
1623 input.insert("decision".to_string(), Value::String("approve".into()));
1624 let res = eng.resume(paused, input).await.unwrap();
1625
1626 assert_eq!(res.status, WorkflowStatus::Completed);
1627 assert!(res.succeeded());
1628 let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1630 assert_eq!(ran, vec!["gate", "done"]);
1631 assert!(matches!(res.stages[0].output, StageOutput::Approval { .. }));
1632 assert_eq!(
1634 res.final_state.get("approval.decision"),
1635 Some(&Value::String("approve".into()))
1636 );
1637 assert!(res.final_state.contains_key("stage.gate.answer"));
1638 assert!(res.final_state.contains_key("approval"));
1639 }
1640
1641 #[tokio::test]
1642 async fn resume_branches_on_answer_after_checkpoint_roundtrip() {
1643 let wf = Workflow {
1646 id: "wf".into(),
1647 name: "WF".into(),
1648 start: "gate".into(),
1649 goal: None,
1650 stages: vec![
1651 approval_stage("gate", "approval"),
1652 empty_proposal_stage("approved"),
1653 empty_proposal_stage("revise"),
1654 ],
1655 edges: vec![
1656 edge(
1657 "gate",
1658 "approved",
1659 vec![car_ir::Precondition {
1660 key: "approval.decision".into(),
1661 operator: "eq".into(),
1662 value: Value::String("approve".into()),
1663 description: String::new(),
1664 }],
1665 ),
1666 edge("gate", "revise", vec![]),
1667 ],
1668 max_iterations: 100,
1669 metadata: HashMap::new(),
1670 };
1671 let eng = test_engine();
1672 let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1673
1674 let dir = std::env::temp_dir().join(format!(
1676 "car-wf-resume-{}",
1677 uuid::Uuid::new_v4().simple()
1678 ));
1679 let store = crate::CheckpointStore::open(&dir).unwrap();
1680 store.save(&paused).unwrap();
1681 let run_id = paused.run_id.clone();
1682 let reloaded = store.load(&run_id).unwrap().expect("checkpoint reloads");
1683
1684 let mut input = HashMap::new();
1685 input.insert("decision".to_string(), Value::String("approve".into()));
1686 let res = eng.resume(reloaded, input).await.unwrap();
1687
1688 assert_eq!(res.status, WorkflowStatus::Completed);
1689 let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1690 assert!(ran.contains(&"approved"), "approve answer routes to approved");
1691 assert!(!ran.contains(&"revise"), "revise branch must be skipped");
1692
1693 store.remove(&run_id).unwrap();
1694 let _ = std::fs::remove_dir_all(&dir);
1695 }
1696
1697 #[tokio::test]
1698 async fn iteration_guard_counts_each_stage_once_across_pause() {
1699 let wf = Workflow {
1703 id: "wf".into(),
1704 name: "WF".into(),
1705 start: "a".into(),
1706 goal: None,
1707 stages: vec![
1708 empty_proposal_stage("a"),
1709 approval_stage("gate", "approval"),
1710 empty_proposal_stage("done"),
1711 ],
1712 edges: vec![edge("a", "gate", vec![]), edge("gate", "done", vec![])],
1713 max_iterations: 100,
1714 metadata: HashMap::new(),
1715 };
1716 let eng = test_engine();
1717 let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1718 assert_eq!(paused.iterations, 2);
1720
1721 let mut input = HashMap::new();
1722 input.insert("decision".to_string(), Value::String("ok".into()));
1723 let res = eng.resume(paused, input).await.unwrap();
1724 assert_eq!(res.status, WorkflowStatus::Completed);
1725 let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
1727 assert_eq!(ran, vec!["a", "gate", "done"]);
1728 }
1729
1730 #[tokio::test]
1731 async fn resume_rejects_missing_required_field() {
1732 let mut gate = approval_stage("gate", "approval");
1733 if let StageStep::Approval(ap) = &mut gate.step {
1734 ap.fields = vec![ApprovalField {
1735 name: "decision".into(),
1736 label: "Decision".into(),
1737 field_type: "text".into(),
1738 options: vec![],
1739 required: true,
1740 }];
1741 }
1742 let wf = Workflow {
1743 id: "wf".into(),
1744 name: "WF".into(),
1745 start: "gate".into(),
1746 goal: None,
1747 stages: vec![gate, empty_proposal_stage("done")],
1748 edges: vec![edge("gate", "done", vec![])],
1749 max_iterations: 100,
1750 metadata: HashMap::new(),
1751 };
1752 let eng = test_engine();
1753 let paused = eng.run(&wf).await.unwrap().paused.unwrap();
1754 let err = eng.resume(paused, HashMap::new()).await.unwrap_err();
1756 assert!(matches!(err, WorkflowError::InvalidApprovalInput(_)));
1757 }
1758
1759 #[tokio::test]
1760 async fn resume_rejects_checkpoint_pointing_at_non_approval_stage() {
1761 let wf = Workflow {
1763 id: "wf".into(),
1764 name: "WF".into(),
1765 start: "work".into(),
1766 goal: None,
1767 stages: vec![empty_proposal_stage("work")],
1768 edges: vec![],
1769 max_iterations: 100,
1770 metadata: HashMap::new(),
1771 };
1772 let forged = PausedWorkflow {
1773 run_id: "forged".into(),
1774 workflow: wf,
1775 paused_stage_id: "work".into(),
1776 prompt: String::new(),
1777 fields: vec![],
1778 output_key: "x".into(),
1779 wf_state: HashMap::new(),
1780 stage_results: vec![],
1781 completed_stage_ids: vec![],
1782 iterations: 1,
1783 prior_duration_ms: 0.0,
1784 created_at: chrono::Utc::now(),
1785 };
1786 let eng = test_engine();
1787 let err = eng.resume(forged, HashMap::new()).await.unwrap_err();
1788 assert!(matches!(err, WorkflowError::InvalidResume(_)));
1789 }
1790
1791 fn state_write_action(key: &str, value: Value) -> car_ir::Action {
1794 car_ir::Action {
1795 id: format!("sw-{key}"),
1796 action_type: car_ir::ActionType::StateWrite,
1797 tool: None,
1798 parameters: [
1799 ("key".to_string(), Value::from(key)),
1800 ("value".to_string(), value),
1801 ]
1802 .into(),
1803 preconditions: vec![],
1804 expected_effects: HashMap::new(),
1805 state_dependencies: vec![],
1806 idempotent: false,
1807 max_retries: 0,
1808 failure_behavior: car_ir::FailureBehavior::Abort,
1809 timeout_ms: None,
1810 metadata: HashMap::new(),
1811 }
1812 }
1813
1814 fn proposal_step_writing(key: &str, value: Value) -> StageStep {
1815 StageStep::Proposal(ProposalStep {
1816 proposal: ActionProposal {
1817 id: format!("p-{key}"),
1818 source: "test".into(),
1819 actions: vec![state_write_action(key, value)],
1820 timestamp: chrono::Utc::now(),
1821 context: HashMap::new(),
1822 },
1823 })
1824 }
1825
1826 fn single_stage_wf(id: &str, step: StageStep) -> Workflow {
1827 Workflow {
1828 id: "wf".into(),
1829 name: "WF".into(),
1830 start: id.into(),
1831 goal: None,
1832 stages: vec![Stage {
1833 id: id.into(),
1834 name: id.into(),
1835 step,
1836 compensation: None,
1837 timeout_ms: None,
1838 metadata: HashMap::new(),
1839 }],
1840 edges: vec![],
1841 max_iterations: 100,
1842 metadata: HashMap::new(),
1843 }
1844 }
1845
1846 #[tokio::test]
1847 async fn loop_until_empty_predicate_runs_to_cap() {
1848 let step = StageStep::LoopUntil(LoopUntilStep {
1850 body: Box::new(empty_proposal_stage("ignored").step),
1851 until: vec![],
1852 max_iterations: 3,
1853 });
1854 let wf = single_stage_wf("loop", step);
1855 let res = test_engine().run(&wf).await.unwrap();
1856
1857 assert_eq!(res.status, WorkflowStatus::Completed);
1858 match &res.stages[0].output {
1859 StageOutput::Loop {
1860 iterations,
1861 satisfied,
1862 iterations_output,
1863 } => {
1864 assert_eq!(*iterations, 3);
1865 assert!(!*satisfied);
1866 assert_eq!(iterations_output.len(), 3);
1867 }
1868 other => panic!("expected Loop output, got {other:?}"),
1869 }
1870 assert_eq!(
1871 res.final_state.get("stage.loop.iteration"),
1872 Some(&Value::from(3u32))
1873 );
1874 }
1875
1876 #[tokio::test]
1877 async fn loop_until_stops_when_predicate_satisfied() {
1878 let step = StageStep::LoopUntil(LoopUntilStep {
1880 body: Box::new(proposal_step_writing("done", Value::Bool(true))),
1881 until: vec![car_ir::Precondition {
1882 key: "done".into(),
1883 operator: "eq".into(),
1884 value: Value::Bool(true),
1885 description: String::new(),
1886 }],
1887 max_iterations: 10,
1888 });
1889 let wf = single_stage_wf("loop", step);
1890 let res = test_engine().run(&wf).await.unwrap();
1891
1892 match &res.stages[0].output {
1893 StageOutput::Loop {
1894 iterations,
1895 satisfied,
1896 ..
1897 } => {
1898 assert_eq!(*iterations, 1, "predicate true after first body run");
1899 assert!(*satisfied);
1900 }
1901 other => panic!("expected Loop output, got {other:?}"),
1902 }
1903 assert_eq!(res.final_state.get("done"), Some(&Value::Bool(true)));
1905 }
1906
1907 #[tokio::test]
1908 async fn for_each_runs_body_per_runtime_item() {
1909 let seed = Stage {
1911 id: "seed".into(),
1912 name: "seed".into(),
1913 step: proposal_step_writing(
1914 "files",
1915 serde_json::json!(["a.rs", "b.rs", "c.rs"]),
1916 ),
1917 compensation: None,
1918 timeout_ms: None,
1919 metadata: HashMap::new(),
1920 };
1921 let fan = Stage {
1922 id: "fan".into(),
1923 name: "fan".into(),
1924 step: StageStep::ForEach(ForEachStep {
1925 items_from: "files".into(),
1926 body: Box::new(proposal_step_writing(
1928 "seen_{{index}}",
1929 Value::String("{{item}}".into()),
1930 )),
1931 max_concurrent: 2,
1932 }),
1933 compensation: None,
1934 timeout_ms: None,
1935 metadata: HashMap::new(),
1936 };
1937 let wf = Workflow {
1938 id: "wf".into(),
1939 name: "WF".into(),
1940 start: "seed".into(),
1941 goal: None,
1942 stages: vec![seed, fan],
1943 edges: vec![edge("seed", "fan", vec![])],
1944 max_iterations: 100,
1945 metadata: HashMap::new(),
1946 };
1947 let res = test_engine().run(&wf).await.unwrap();
1948
1949 assert_eq!(res.status, WorkflowStatus::Completed);
1950 let fan_out = &res.stages[1].output;
1951 match fan_out {
1952 StageOutput::ForEach { items, outputs } => {
1953 assert_eq!(items, &vec!["a.rs", "b.rs", "c.rs"]);
1954 assert_eq!(outputs.len(), 3);
1955 }
1956 other => panic!("expected ForEach output, got {other:?}"),
1957 }
1958 assert_eq!(
1959 res.final_state.get("foreach.fan.count"),
1960 Some(&Value::from(3usize))
1961 );
1962 assert_eq!(
1964 res.final_state.get("foreach.fan.0.item"),
1965 Some(&Value::String("a.rs".into()))
1966 );
1967 assert_eq!(
1969 res.final_state.get("foreach.fan.0.state.seen_0"),
1970 Some(&Value::String("a.rs".into()))
1971 );
1972 assert_eq!(
1973 res.final_state.get("foreach.fan.2.state.seen_2"),
1974 Some(&Value::String("c.rs".into()))
1975 );
1976 }
1977
1978 #[tokio::test]
1979 async fn for_each_missing_key_is_noop() {
1980 let step = StageStep::ForEach(ForEachStep {
1981 items_from: "nonexistent".into(),
1982 body: Box::new(empty_proposal_stage("b").step),
1983 max_concurrent: 0,
1984 });
1985 let wf = single_stage_wf("fan", step);
1986 let res = test_engine().run(&wf).await.unwrap();
1987 assert_eq!(res.status, WorkflowStatus::Completed);
1988 match &res.stages[0].output {
1989 StageOutput::ForEach { items, outputs } => {
1990 assert!(items.is_empty());
1991 assert!(outputs.is_empty());
1992 }
1993 other => panic!("expected ForEach output, got {other:?}"),
1994 }
1995 }
1996
1997 struct CapturingRunner {
2002 last_task: std::sync::Arc<std::sync::Mutex<String>>,
2003 }
2004
2005 #[async_trait::async_trait]
2006 impl car_multi::AgentRunner for CapturingRunner {
2007 async fn run(
2008 &self,
2009 spec: &car_multi::AgentSpec,
2010 task: &str,
2011 _runtime: &car_engine::Runtime,
2012 _mailbox: &car_multi::Mailbox,
2013 ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
2014 *self.last_task.lock().unwrap() = task.to_string();
2015 Ok(car_multi::AgentOutput {
2016 name: spec.name.clone(),
2017 answer: "ok".into(),
2018 turns: 1,
2019 tool_calls: 0,
2020 duration_ms: 1.0,
2021 error: None,
2022 outcome: None,
2023 tokens: None,
2024 })
2025 }
2026 }
2027
2028 #[tokio::test]
2029 async fn goal_is_pinned_and_anchored_into_agent_task() {
2030 let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2031 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2032 last_task: last_task.clone(),
2033 });
2034 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2035
2036 let wf = Workflow {
2037 id: "wf".into(),
2038 name: "WF".into(),
2039 start: "s".into(),
2040 goal: Some("ship the release safely".into()),
2041 stages: vec![Stage {
2042 id: "s".into(),
2043 name: "s".into(),
2044 step: StageStep::Pattern(PatternStep {
2045 pattern: PatternKind::SwarmParallel,
2046 task: "draft the notes".into(),
2047 agents: vec![car_multi::AgentSpec::new("a", "")],
2048 config: HashMap::new(),
2049 }),
2050 compensation: None,
2051 timeout_ms: None,
2052 metadata: HashMap::new(),
2053 }],
2054 edges: vec![],
2055 max_iterations: 100,
2056 metadata: HashMap::new(),
2057 };
2058
2059 let res = eng.run(&wf).await.unwrap();
2060 assert_eq!(res.status, WorkflowStatus::Completed);
2061 assert_eq!(
2063 res.final_state.get("goal"),
2064 Some(&Value::String("ship the release safely".into()))
2065 );
2066 let seen = last_task.lock().unwrap().clone();
2068 assert!(seen.contains("Overall goal: ship the release safely"), "got: {seen}");
2069 assert!(seen.contains("Current step: draft the notes"), "got: {seen}");
2070 }
2071
2072 struct ReviewRunner;
2074
2075 #[async_trait::async_trait]
2076 impl car_multi::AgentRunner for ReviewRunner {
2077 async fn run(
2078 &self,
2079 spec: &car_multi::AgentSpec,
2080 _task: &str,
2081 _runtime: &car_engine::Runtime,
2082 _mailbox: &car_multi::Mailbox,
2083 ) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
2084 let answer = if spec.name.contains("review") {
2085 r#"{"passed": true, "findings": [{"criterion":"complete","passed":true,"evidence":"all there","severity":"info"}]}"#.to_string()
2086 } else {
2087 spec.name.clone()
2088 };
2089 Ok(car_multi::AgentOutput {
2090 name: spec.name.clone(),
2091 answer,
2092 turns: 1,
2093 tool_calls: 0,
2094 duration_ms: 1.0,
2095 error: None,
2096 outcome: None,
2097 tokens: None,
2098 })
2099 }
2100 }
2101
2102 #[tokio::test]
2103 async fn adversarial_review_stage_gates_prior_work() {
2104 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
2105 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2106
2107 let seed = Stage {
2109 id: "produce".into(),
2110 name: "produce".into(),
2111 step: proposal_step_writing("draft", Value::String("the finished work".into())),
2112 compensation: None,
2113 timeout_ms: None,
2114 metadata: HashMap::new(),
2115 };
2116 let review = Stage {
2117 id: "review".into(),
2118 name: "review".into(),
2119 step: StageStep::Pattern(PatternStep {
2120 pattern: PatternKind::AdversarialReview,
2121 task: "verify".into(),
2122 agents: vec![car_multi::AgentSpec::new("reviewer", "be strict")],
2123 config: [
2124 (
2125 "criteria".to_string(),
2126 serde_json::json!(["work is complete"]),
2127 ),
2128 ("review_key".to_string(), Value::String("draft".into())),
2129 ]
2130 .into(),
2131 }),
2132 compensation: None,
2133 timeout_ms: None,
2134 metadata: HashMap::new(),
2135 };
2136 let wf = Workflow {
2137 id: "wf".into(),
2138 name: "WF".into(),
2139 start: "produce".into(),
2140 goal: None,
2141 stages: vec![seed, review],
2142 edges: vec![edge("produce", "review", vec![])],
2143 max_iterations: 100,
2144 metadata: HashMap::new(),
2145 };
2146
2147 let res = eng.run(&wf).await.unwrap();
2148 assert_eq!(res.status, WorkflowStatus::Completed);
2149 assert_eq!(
2151 res.final_state.get("stage.review.review_passed"),
2152 Some(&Value::Bool(true))
2153 );
2154 assert!(matches!(
2155 res.stages[1].output,
2156 StageOutput::Review { passed: true, .. }
2157 ));
2158 }
2159
2160 #[tokio::test]
2161 async fn adversarial_review_fails_closed_on_missing_work() {
2162 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(ReviewRunner);
2163 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2164 let wf = single_stage_wf(
2166 "review",
2167 StageStep::Pattern(PatternStep {
2168 pattern: PatternKind::AdversarialReview,
2169 task: "verify".into(),
2170 agents: vec![car_multi::AgentSpec::new("reviewer", "")],
2171 config: [("review_key".to_string(), Value::String("nothing".into()))].into(),
2172 }),
2173 );
2174 let res = eng.run(&wf).await.unwrap();
2175 assert_eq!(res.status, WorkflowStatus::Completed);
2176 assert_eq!(
2177 res.final_state.get("stage.review.review_passed"),
2178 Some(&Value::Bool(false)),
2179 "empty work must fail closed, not vacuously pass"
2180 );
2181 }
2182
2183 #[tokio::test]
2184 async fn stage_delta_cannot_clobber_pinned_goal() {
2185 let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2188 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2189 last_task: last_task.clone(),
2190 });
2191 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2192
2193 let hijack = Stage {
2194 id: "hijack".into(),
2195 name: "hijack".into(),
2196 step: proposal_step_writing("goal", Value::String("DO SOMETHING ELSE".into())),
2197 compensation: None,
2198 timeout_ms: None,
2199 metadata: HashMap::new(),
2200 };
2201 let work = Stage {
2202 id: "work".into(),
2203 name: "work".into(),
2204 step: StageStep::Pattern(PatternStep {
2205 pattern: PatternKind::SwarmParallel,
2206 task: "do the thing".into(),
2207 agents: vec![car_multi::AgentSpec::new("a", "")],
2208 config: HashMap::new(),
2209 }),
2210 compensation: None,
2211 timeout_ms: None,
2212 metadata: HashMap::new(),
2213 };
2214 let wf = Workflow {
2215 id: "wf".into(),
2216 name: "WF".into(),
2217 start: "hijack".into(),
2218 goal: Some("the real goal".into()),
2219 stages: vec![hijack, work],
2220 edges: vec![edge("hijack", "work", vec![])],
2221 max_iterations: 100,
2222 metadata: HashMap::new(),
2223 };
2224
2225 let res = eng.run(&wf).await.unwrap();
2226 assert_eq!(res.status, WorkflowStatus::Completed);
2227 assert_eq!(
2229 res.final_state.get("goal"),
2230 Some(&Value::String("the real goal".into()))
2231 );
2232 let seen = last_task.lock().unwrap().clone();
2233 assert!(seen.contains("Overall goal: the real goal"), "got: {seen}");
2234 }
2235
2236 #[tokio::test]
2239 async fn run_cached_replays_prefix_and_runs_rest_live() {
2240 let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2241 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2242 last_task: last_task.clone(),
2243 });
2244 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2245
2246 let a = Stage {
2248 id: "a".into(),
2249 name: "a".into(),
2250 step: proposal_step_writing("k", Value::String("v".into())),
2251 compensation: None,
2252 timeout_ms: None,
2253 metadata: HashMap::new(),
2254 };
2255 let b = Stage {
2256 id: "b".into(),
2257 name: "b".into(),
2258 step: StageStep::Pattern(PatternStep {
2259 pattern: PatternKind::SwarmParallel,
2260 task: "do b".into(),
2261 agents: vec![car_multi::AgentSpec::new("agent", "")],
2262 config: HashMap::new(),
2263 }),
2264 compensation: None,
2265 timeout_ms: None,
2266 metadata: HashMap::new(),
2267 };
2268 let wf = Workflow {
2269 id: "wf".into(),
2270 name: "WF".into(),
2271 start: "a".into(),
2272 goal: None,
2273 stages: vec![a, b],
2274 edges: vec![edge("a", "b", vec![])],
2275 max_iterations: 100,
2276 metadata: HashMap::new(),
2277 };
2278
2279 let mut final_state = HashMap::new();
2281 final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
2282 final_state.insert("k".to_string(), Value::String("v".into()));
2283 let prior = WorkflowResult {
2284 workflow_id: "wf".into(),
2285 workflow_name: "WF".into(),
2286 status: WorkflowStatus::Failed,
2287 stages: vec![StageResult {
2288 stage_id: "a".into(),
2289 stage_name: "a".into(),
2290 status: StageStatus::Succeeded,
2291 output: StageOutput::Empty,
2292 duration_ms: 1.0,
2293 error: None,
2294 }],
2295 compensations: vec![],
2296 duration_ms: 1.0,
2297 timestamp: chrono::Utc::now(),
2298 final_state,
2299 paused: None,
2300 };
2301
2302 let res = eng.run_cached(&wf, &prior).await.unwrap();
2303 assert_eq!(res.status, WorkflowStatus::Completed);
2304 let ran: Vec<&str> = res.stages.iter().map(|s| s.stage_id.as_str()).collect();
2306 assert_eq!(ran, vec!["a", "b"]);
2307 assert!(
2308 !last_task.lock().unwrap().is_empty(),
2309 "stage B should have executed live"
2310 );
2311 assert!(matches!(res.stages[0].output, StageOutput::Empty));
2313 }
2314
2315 #[tokio::test]
2316 async fn run_cached_strips_stale_uncached_stage_bookkeeping() {
2317 let eng = test_engine();
2320 let wf = Workflow {
2321 id: "wf".into(),
2322 name: "WF".into(),
2323 start: "a".into(),
2324 goal: None,
2325 stages: vec![empty_proposal_stage("a"), empty_proposal_stage("ghost")],
2327 edges: vec![],
2328 max_iterations: 100,
2329 metadata: HashMap::new(),
2330 };
2331 let mut final_state = HashMap::new();
2332 final_state.insert("stage.a.succeeded".to_string(), Value::Bool(true));
2333 final_state.insert("stage.ghost.error".to_string(), Value::String("stale".into()));
2334 final_state.insert("real_data".to_string(), Value::from(42));
2335 let prior = WorkflowResult {
2336 workflow_id: "wf".into(),
2337 workflow_name: "WF".into(),
2338 status: WorkflowStatus::Failed,
2339 stages: vec![StageResult {
2340 stage_id: "a".into(),
2341 stage_name: "a".into(),
2342 status: StageStatus::Succeeded,
2343 output: StageOutput::Empty,
2344 duration_ms: 1.0,
2345 error: None,
2346 }],
2347 compensations: vec![],
2348 duration_ms: 1.0,
2349 timestamp: chrono::Utc::now(),
2350 final_state,
2351 paused: None,
2352 };
2353
2354 let res = eng.run_cached(&wf, &prior).await.unwrap();
2355 assert_eq!(res.status, WorkflowStatus::Completed);
2356 assert!(!res.final_state.contains_key("stage.ghost.error"));
2358 assert_eq!(res.final_state.get("real_data"), Some(&Value::from(42)));
2360 assert_eq!(
2361 res.final_state.get("stage.a.succeeded"),
2362 Some(&Value::Bool(true))
2363 );
2364 }
2365
2366 #[tokio::test]
2367 async fn run_cached_with_full_cache_executes_nothing() {
2368 let last_task = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2369 let runner: Arc<dyn car_multi::AgentRunner> = Arc::new(CapturingRunner {
2370 last_task: last_task.clone(),
2371 });
2372 let eng = WorkflowEngine::new(runner, car_multi::SharedInfra::new());
2373
2374 let wf = single_stage_wf(
2375 "only",
2376 StageStep::Pattern(PatternStep {
2377 pattern: PatternKind::SwarmParallel,
2378 task: "x".into(),
2379 agents: vec![car_multi::AgentSpec::new("agent", "")],
2380 config: HashMap::new(),
2381 }),
2382 );
2383 let prior = WorkflowResult {
2384 workflow_id: "wf".into(),
2385 workflow_name: "WF".into(),
2386 status: WorkflowStatus::Completed,
2387 stages: vec![StageResult {
2388 stage_id: "only".into(),
2389 stage_name: "only".into(),
2390 status: StageStatus::Succeeded,
2391 output: StageOutput::Empty,
2392 duration_ms: 1.0,
2393 error: None,
2394 }],
2395 compensations: vec![],
2396 duration_ms: 1.0,
2397 timestamp: chrono::Utc::now(),
2398 final_state: HashMap::new(),
2399 paused: None,
2400 };
2401
2402 let res = eng.run_cached(&wf, &prior).await.unwrap();
2403 assert_eq!(res.status, WorkflowStatus::Completed);
2404 assert!(
2405 last_task.lock().unwrap().is_empty(),
2406 "fully-cached run must not execute any agent"
2407 );
2408 }
2409}