Skip to main content

a3s_code_core/orchestration/
combinators.rs

1//! Orchestration combinators built on the [`AgentExecutor`] seam.
2//!
3//! [`execute_steps_parallel`](super::execute_steps_parallel) (in `executor`)
4//! is the barrier (`parallel`) primitive. This module adds `pipeline`: the
5//! one genuinely new scheduling shape, where each item flows through a chain
6//! of stages independently — no barrier between stages.
7
8use super::checkpoint::WorkflowCheckpoint;
9use super::executor::{execute_steps_parallel, AgentExecutor, AgentStepSpec, StepOutcome};
10use crate::agent::AgentEvent;
11use crate::ordered_parallel::run_ordered_parallel_with_limit;
12use crate::store::SessionStore;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::broadcast;
16
17fn now_epoch_ms() -> u64 {
18    std::time::SystemTime::now()
19        .duration_since(std::time::UNIX_EPOCH)
20        .map(|d| d.as_millis() as u64)
21        .unwrap_or(0)
22}
23
24/// A pipeline stage: given the previous stage's outcome (`None` before the
25/// first stage) and the original item, produce the next step to run — or
26/// `None` to stop this item's chain early.
27///
28/// Stages are pure spec-builders; the executor runs them. A stage can branch
29/// on the prior result (e.g. "verify the finding the review stage produced").
30pub type PipelineStage<I> =
31    Arc<dyn Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec> + Send + Sync>;
32
33/// Run each item through `stages` as an independent chain.
34///
35/// All chains run concurrently, bounded by the executor's
36/// [`concurrency_hint`](AgentExecutor::concurrency_hint) — there is **no
37/// barrier between stages**, so item A can be in stage 3 while item B is still
38/// in stage 1. Wall-clock is the slowest single chain, not the
39/// sum-of-slowest-per-stage that a barrier `parallel` per stage would incur.
40///
41/// A chain stops early when a stage returns `None` or when a step fails
42/// (later stages would only build on a failed result). Returns each item's
43/// last outcome (`None` if its first stage produced no spec), preserving input
44/// order. A stage closure that panics isolates to that one chain (its result
45/// becomes `None`) without dropping the others.
46pub async fn execute_pipeline<I>(
47    executor: Arc<dyn AgentExecutor>,
48    items: Vec<I>,
49    stages: Vec<PipelineStage<I>>,
50    event_tx: Option<broadcast::Sender<AgentEvent>>,
51) -> Vec<Option<StepOutcome>>
52where
53    I: Send + 'static,
54{
55    let limit = executor.concurrency_hint();
56    let stages = Arc::new(stages);
57
58    let results = run_ordered_parallel_with_limit(items, limit, move |_idx, item| {
59        let executor = Arc::clone(&executor);
60        let stages = Arc::clone(&stages);
61        let event_tx = event_tx.clone();
62        async move {
63            let mut prev: Option<StepOutcome> = None;
64            for stage in stages.iter() {
65                let Some(spec) = stage(prev.as_ref(), &item) else {
66                    break;
67                };
68                let outcome = executor.execute_step(spec, event_tx.clone()).await;
69                let succeeded = outcome.success;
70                prev = Some(outcome);
71                if !succeeded {
72                    break;
73                }
74            }
75            prev
76        }
77    })
78    .await;
79
80    // A panicked chain (Err) yields `None`; a normal chain yields its last
81    // outcome. Order is preserved by `run_ordered_parallel_with_limit`.
82    results
83        .into_iter()
84        .map(|result| result.output.unwrap_or(None))
85        .collect()
86}
87
88/// Like [`execute_steps_parallel`](super::execute_steps_parallel), but
89/// **resumable**: progress is journaled to `store` under `workflow_id`, so an
90/// interrupted run picks up from the last completed step.
91///
92/// On entry, any steps already recorded in a prior checkpoint are skipped and
93/// their cached outcomes reused; only the rest are dispatched. As each step
94/// completes, the checkpoint is rewritten (the step boundary), so a crash
95/// mid-run loses at most the in-flight steps. Because the checkpoint is
96/// serializable and the executor is a parameter, a host can resume an
97/// interrupted workflow on a *different* node by passing that node's executor.
98///
99/// Results are returned in the original `specs` order. On full success the
100/// checkpoint is deleted (the workflow is terminal); only a crash leaves one
101/// behind for resume.
102pub async fn execute_steps_parallel_resumable(
103    executor: Arc<dyn AgentExecutor>,
104    specs: Vec<AgentStepSpec>,
105    workflow_id: &str,
106    store: Arc<dyn SessionStore>,
107    event_tx: Option<broadcast::Sender<AgentEvent>>,
108) -> Vec<StepOutcome> {
109    // Prior progress. An unreadable checkpoint — e.g. one written by a newer,
110    // incompatible schema version, which the store rejects via
111    // `ensure_loadable` — is treated as *no* prior progress: the workflow
112    // re-runs from scratch rather than resuming from state it can't interpret.
113    // That's a fail-safe (do the work), but surface it rather than swallow it.
114    let done: HashMap<String, StepOutcome> = match store.load_workflow_checkpoint(workflow_id).await
115    {
116        Ok(Some(cp)) => cp.completed(),
117        Ok(None) => HashMap::new(),
118        Err(e) => {
119            tracing::warn!(
120                workflow_id = %workflow_id,
121                error = %e,
122                "workflow checkpoint unreadable; re-running the workflow from scratch"
123            );
124            HashMap::new()
125        }
126    };
127
128    let pending: Vec<AgentStepSpec> = specs
129        .iter()
130        .filter(|s| !done.contains_key(&s.task_id))
131        .cloned()
132        .collect();
133    let labels: Vec<(String, String)> = pending
134        .iter()
135        .map(|s| (s.task_id.clone(), s.agent.clone()))
136        .collect();
137
138    // Accumulator seeded with prior progress; persisted at every step boundary.
139    let acc = Arc::new(tokio::sync::Mutex::new(done.clone()));
140    let limit = executor.concurrency_hint();
141    let workflow_id_owned = workflow_id.to_string();
142    let store_steps = Arc::clone(&store);
143
144    let results = run_ordered_parallel_with_limit(pending, limit, move |_idx, spec| {
145        let executor = Arc::clone(&executor);
146        let event_tx = event_tx.clone();
147        let acc = Arc::clone(&acc);
148        let store = Arc::clone(&store_steps);
149        let workflow_id = workflow_id_owned.clone();
150        async move {
151            let outcome = executor.execute_step(spec, event_tx).await;
152            // Step boundary: record only *successful* steps, so a failed step
153            // is retried on resume (its effect didn't complete) while a
154            // succeeded step's work is never redone.
155            if outcome.success {
156                let mut guard = acc.lock().await;
157                guard.insert(outcome.task_id.clone(), outcome.clone());
158                let checkpoint =
159                    WorkflowCheckpoint::from_completed(&workflow_id, &guard, now_epoch_ms());
160                if let Err(e) = store
161                    .save_workflow_checkpoint(&workflow_id, &checkpoint)
162                    .await
163                {
164                    // Losing a checkpoint must not fail the live run.
165                    tracing::warn!(
166                        workflow_id = %workflow_id,
167                        error = %e,
168                        "workflow checkpoint save failed; run continues"
169                    );
170                }
171            }
172            outcome
173        }
174    })
175    .await;
176
177    let mut fresh: HashMap<String, StepOutcome> = HashMap::new();
178    for result in results {
179        match result.output {
180            Ok(outcome) => {
181                fresh.insert(outcome.task_id.clone(), outcome);
182            }
183            Err(error) => {
184                if let Some((task_id, agent)) = labels.get(result.index).cloned() {
185                    fresh.insert(
186                        task_id.clone(),
187                        StepOutcome::failed(task_id, agent, error.to_string()),
188                    );
189                }
190            }
191        }
192    }
193
194    // Merge cached + freshly-run, in the original spec order.
195    let merged: Vec<StepOutcome> = specs
196        .iter()
197        .map(|s| {
198            done.get(&s.task_id)
199                .cloned()
200                .or_else(|| fresh.remove(&s.task_id))
201                .unwrap_or_else(|| {
202                    StepOutcome::failed(
203                        s.task_id.clone(),
204                        s.agent.clone(),
205                        "step produced no outcome",
206                    )
207                })
208        })
209        .collect();
210
211    if merged.iter().all(|o| o.success) {
212        let _ = store.delete_workflow_checkpoint(workflow_id).await;
213    }
214    merged
215}
216
217/// What an [`execute_loop`] predicate decides after seeing a round's outcomes.
218pub enum LoopDecision {
219    /// Run another round with these specs.
220    Continue(Vec<AgentStepSpec>),
221    /// Stop now; the loop returns the round that just completed.
222    Stop,
223}
224
225/// Run rounds until the predicate says [`Stop`](LoopDecision::Stop), a round is
226/// asked to run no specs, or `max_iterations` is reached — whichever comes
227/// first. Each round is a barrier ([`execute_steps_parallel`]); `next` receives
228/// the just-completed round's outcomes and decides whether (and with what) to
229/// continue. Returns the last round's outcomes (empty if `initial` was empty).
230///
231/// `max_iterations` is **mandatory and a hard cap**: it is clamped to at least
232/// 1, and once reached the loop stops even if the predicate returns
233/// [`Continue`](LoopDecision::Continue). This is the guard that makes an
234/// LLM-driven, unknown-length loop (e.g. loop-until-dry) safe — the predicate
235/// must never be the *only* termination condition.
236///
237/// This is the "loop" shape from the orchestration grammar; like the other
238/// combinators it is written purely against the [`AgentExecutor`] seam and adds
239/// no scheduling of its own.
240pub async fn execute_loop<F>(
241    executor: Arc<dyn AgentExecutor>,
242    initial: Vec<AgentStepSpec>,
243    max_iterations: usize,
244    event_tx: Option<broadcast::Sender<AgentEvent>>,
245    mut next: F,
246) -> Vec<StepOutcome>
247where
248    F: FnMut(&[StepOutcome]) -> LoopDecision + Send,
249{
250    let cap = max_iterations.max(1);
251    let mut specs = initial;
252    let mut last = Vec::new();
253    let mut iterations = 0;
254
255    while !specs.is_empty() {
256        let round = execute_steps_parallel(
257            Arc::clone(&executor),
258            std::mem::take(&mut specs),
259            event_tx.clone(),
260        )
261        .await;
262        iterations += 1;
263        let decision = next(&round);
264        last = round;
265        match decision {
266            LoopDecision::Continue(more) if iterations < cap => specs = more,
267            _ => break,
268        }
269    }
270
271    last
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use async_trait::async_trait;
278    use std::sync::atomic::{AtomicUsize, Ordering};
279    use std::time::Duration;
280
281    /// Echoes the prompt into the output; fails for agent `"fail"`; panics for
282    /// agent `"boom"`. Records peak concurrency.
283    struct EchoExecutor {
284        active: Arc<AtomicUsize>,
285        max_active: Arc<AtomicUsize>,
286    }
287
288    impl EchoExecutor {
289        fn new() -> Self {
290            Self {
291                active: Arc::new(AtomicUsize::new(0)),
292                max_active: Arc::new(AtomicUsize::new(0)),
293            }
294        }
295    }
296
297    #[async_trait]
298    impl AgentExecutor for EchoExecutor {
299        async fn execute_step(
300            &self,
301            spec: AgentStepSpec,
302            _event_tx: Option<broadcast::Sender<AgentEvent>>,
303        ) -> StepOutcome {
304            let now = self.active.fetch_add(1, Ordering::SeqCst) + 1;
305            self.max_active.fetch_max(now, Ordering::SeqCst);
306            tokio::time::sleep(Duration::from_millis(15)).await;
307            self.active.fetch_sub(1, Ordering::SeqCst);
308            assert!(spec.agent != "boom", "boom");
309            StepOutcome {
310                task_id: spec.task_id.clone(),
311                session_id: format!("task-run-{}", spec.task_id),
312                agent: spec.agent.clone(),
313                output: spec.prompt.clone(),
314                success: spec.agent != "fail",
315                structured: None,
316            }
317        }
318        fn concurrency_hint(&self) -> usize {
319            4
320        }
321    }
322
323    fn stage<I, F>(f: F) -> PipelineStage<I>
324    where
325        F: Fn(Option<&StepOutcome>, &I) -> Option<AgentStepSpec> + Send + Sync + 'static,
326    {
327        Arc::new(f)
328    }
329
330    #[tokio::test]
331    async fn each_item_chains_through_stages_and_later_stages_see_prior_output() {
332        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
333        // Stage 1: run agent "explore" with the item as the prompt.
334        // Stage 2: run agent "review" with a prompt derived from stage 1's output.
335        let stages = vec![
336            stage(|_prev: Option<&StepOutcome>, item: &&str| {
337                Some(AgentStepSpec::new("s1", "explore", "d", *item))
338            }),
339            stage(|prev: Option<&StepOutcome>, _item: &&str| {
340                let prior = prev.map(|o| o.output.clone()).unwrap_or_default();
341                Some(AgentStepSpec::new(
342                    "s2",
343                    "review",
344                    "d",
345                    format!("review of: {prior}"),
346                ))
347            }),
348        ];
349        let out = execute_pipeline(exec, vec!["alpha", "beta"], stages, None).await;
350
351        assert_eq!(out.len(), 2, "one result per item, order preserved");
352        // Each item's final outcome is stage 2, whose prompt was derived from
353        // stage 1's output (the item text).
354        assert_eq!(out[0].as_ref().unwrap().output, "review of: alpha");
355        assert_eq!(out[1].as_ref().unwrap().output, "review of: beta");
356        assert!(out.iter().all(|o| o.as_ref().unwrap().success));
357    }
358
359    #[tokio::test]
360    async fn chain_stops_on_failure_and_on_none_stage() {
361        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
362        // First item: stage 1 fails (agent "fail") → stage 2 must not run.
363        // Second item: stage 1 ok, stage 2 returns None → chain stops at stage 1.
364        let stages = vec![
365            stage(|_p: Option<&StepOutcome>, item: &&str| {
366                let agent = if *item == "x" { "fail" } else { "explore" };
367                Some(AgentStepSpec::new("s1", agent, "d", *item))
368            }),
369            stage(|_p: Option<&StepOutcome>, item: &&str| {
370                if *item == "y" {
371                    None // stop the second item's chain at stage 1
372                } else {
373                    Some(AgentStepSpec::new("s2", "review", "d", "second"))
374                }
375            }),
376        ];
377        let out = execute_pipeline(exec, vec!["x", "y"], stages, None).await;
378
379        let first = out[0].as_ref().unwrap();
380        assert!(!first.success, "failed stage 1 surfaces");
381        assert_eq!(
382            first.output, "x",
383            "stage 2 did not run after stage 1 failed"
384        );
385
386        let second = out[1].as_ref().unwrap();
387        assert!(second.success);
388        assert_eq!(
389            second.output, "y",
390            "stage 2 returned None → chain stopped at stage 1"
391        );
392    }
393
394    #[tokio::test]
395    async fn no_barrier_between_stages_bounded_by_hint() {
396        let echo = EchoExecutor::new();
397        let max_active = Arc::clone(&echo.max_active);
398        let exec: Arc<dyn AgentExecutor> = Arc::new(echo);
399        let stages = vec![
400            stage(|_p: Option<&StepOutcome>, item: &usize| {
401                Some(AgentStepSpec::new(
402                    format!("s1-{item}"),
403                    "explore",
404                    "d",
405                    "p",
406                ))
407            }),
408            stage(|_p: Option<&StepOutcome>, item: &usize| {
409                Some(AgentStepSpec::new(format!("s2-{item}"), "review", "d", "p"))
410            }),
411        ];
412        let items: Vec<usize> = (0..8).collect();
413        let out = execute_pipeline(exec, items, stages, None).await;
414        assert_eq!(out.len(), 8);
415        assert!(out.iter().all(|o| o.is_some()));
416        // concurrency_hint is 4: chains run concurrently but never exceed it.
417        assert!(
418            max_active.load(Ordering::SeqCst) <= 4,
419            "concurrency never exceeds the executor's hint"
420        );
421    }
422
423    #[tokio::test]
424    async fn panicking_stage_isolates_to_its_chain() {
425        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
426        let stages = vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
427            // The middle item routes to the panicking agent.
428            Some(AgentStepSpec::new("s1", *item, "d", "p"))
429        })];
430        let out = execute_pipeline(exec, vec!["explore", "boom", "review"], stages, None).await;
431        assert_eq!(out.len(), 3);
432        assert!(out[0].as_ref().unwrap().success);
433        assert!(out[1].is_none(), "panicked chain becomes None, not a drop");
434        assert!(out[2].as_ref().unwrap().success, "later chains unaffected");
435    }
436
437    /// Records which task ids it actually ran; always succeeds.
438    struct RecordingExecutor {
439        ran: Arc<tokio::sync::Mutex<Vec<String>>>,
440    }
441
442    #[async_trait]
443    impl AgentExecutor for RecordingExecutor {
444        async fn execute_step(
445            &self,
446            spec: AgentStepSpec,
447            _event_tx: Option<broadcast::Sender<AgentEvent>>,
448        ) -> StepOutcome {
449            self.ran.lock().await.push(spec.task_id.clone());
450            StepOutcome {
451                task_id: spec.task_id.clone(),
452                session_id: format!("task-run-{}", spec.task_id),
453                agent: spec.agent.clone(),
454                output: format!("ran:{}", spec.task_id),
455                success: true,
456                structured: None,
457            }
458        }
459        fn concurrency_hint(&self) -> usize {
460            4
461        }
462    }
463
464    #[tokio::test]
465    async fn resumable_skips_completed_then_clears_on_success() {
466        use crate::store::MemorySessionStore;
467        let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
468
469        // Pre-seed: step "a" already completed on a prior run (possibly on
470        // another node — this exercises the migration path too).
471        let mut done = std::collections::HashMap::new();
472        done.insert(
473            "a".to_string(),
474            StepOutcome {
475                task_id: "a".into(),
476                session_id: "task-run-a".into(),
477                agent: "explore".into(),
478                output: "cached-a".into(),
479                success: true,
480                structured: None,
481            },
482        );
483        store
484            .save_workflow_checkpoint(
485                "wf-1",
486                &WorkflowCheckpoint::from_completed("wf-1", &done, 1),
487            )
488            .await
489            .unwrap();
490
491        // A FRESH executor resumes (the node that runs the rest is not the one
492        // that completed "a").
493        let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
494        let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
495            ran: Arc::clone(&ran),
496        });
497        let specs = vec![
498            AgentStepSpec::new("a", "explore", "d", "pa"),
499            AgentStepSpec::new("b", "review", "d", "pb"),
500        ];
501
502        let out =
503            execute_steps_parallel_resumable(exec, specs, "wf-1", Arc::clone(&store), None).await;
504
505        assert_eq!(
506            *ran.lock().await,
507            vec!["b".to_string()],
508            "only the not-yet-completed step runs"
509        );
510        assert_eq!(out.len(), 2);
511        assert_eq!(out[0].task_id, "a");
512        assert_eq!(
513            out[0].output, "cached-a",
514            "completed step returns its cached outcome, unchanged"
515        );
516        assert_eq!(out[1].task_id, "b");
517        assert!(out.iter().all(|o| o.success));
518        assert!(
519            store
520                .load_workflow_checkpoint("wf-1")
521                .await
522                .unwrap()
523                .is_none(),
524            "a fully-succeeded workflow clears its checkpoint"
525        );
526    }
527
528    #[tokio::test]
529    async fn resumable_retains_checkpoint_recording_only_successes_on_partial_failure() {
530        use crate::store::MemorySessionStore;
531        let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
532        // EchoExecutor fails the agent named "fail".
533        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
534        let specs = vec![
535            AgentStepSpec::new("ok", "explore", "d", "p"),
536            AgentStepSpec::new("bad", "fail", "d", "p"),
537        ];
538
539        let out =
540            execute_steps_parallel_resumable(exec, specs, "wf-2", Arc::clone(&store), None).await;
541        assert!(out[0].success);
542        assert!(!out[1].success);
543
544        // Not all succeeded → checkpoint retained, recording only the success
545        // so the failed step retries on resume.
546        let cp = store
547            .load_workflow_checkpoint("wf-2")
548            .await
549            .unwrap()
550            .expect("checkpoint retained on partial failure");
551        let completed = cp.completed();
552        assert!(completed.contains_key("ok"), "succeeded step is recorded");
553        assert!(
554            !completed.contains_key("bad"),
555            "failed step is NOT recorded → it retries on resume"
556        );
557    }
558
559    struct ZeroHintExecutor;
560    #[async_trait]
561    impl AgentExecutor for ZeroHintExecutor {
562        async fn execute_step(
563            &self,
564            spec: AgentStepSpec,
565            _event_tx: Option<broadcast::Sender<AgentEvent>>,
566        ) -> StepOutcome {
567            StepOutcome {
568                task_id: spec.task_id.clone(),
569                session_id: format!("task-run-{}", spec.task_id),
570                agent: spec.agent.clone(),
571                output: "ok".to_string(),
572                success: true,
573                structured: None,
574            }
575        }
576        fn concurrency_hint(&self) -> usize {
577            0
578        }
579    }
580
581    #[tokio::test]
582    async fn empty_inputs_return_empty() {
583        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
584        assert!(
585            crate::orchestration::execute_steps_parallel(Arc::clone(&exec), vec![], None)
586                .await
587                .is_empty()
588        );
589        let stages: Vec<PipelineStage<&str>> =
590            vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
591                Some(AgentStepSpec::new("s", "explore", "d", *item))
592            })];
593        assert!(execute_pipeline(exec, Vec::<&str>::new(), stages, None)
594            .await
595            .is_empty());
596    }
597
598    #[tokio::test]
599    async fn zero_concurrency_hint_still_makes_progress() {
600        // The .max(1) clamp in run_ordered_parallel_with_limit keeps a 0-hint
601        // executor serialized-but-live instead of deadlocking on 0 permits.
602        let exec: Arc<dyn AgentExecutor> = Arc::new(ZeroHintExecutor);
603        let specs = vec![
604            AgentStepSpec::new("a", "explore", "d", "p"),
605            AgentStepSpec::new("b", "explore", "d", "p"),
606            AgentStepSpec::new("c", "explore", "d", "p"),
607        ];
608        let out = crate::orchestration::execute_steps_parallel(exec, specs, None).await;
609        assert_eq!(
610            out.iter().map(|o| o.task_id.as_str()).collect::<Vec<_>>(),
611            vec!["a", "b", "c"]
612        );
613        assert!(out.iter().all(|o| o.success));
614    }
615
616    #[tokio::test]
617    async fn pipeline_first_stage_none_yields_none_outcome() {
618        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
619        let stages: Vec<PipelineStage<&str>> =
620            vec![stage(|_p: Option<&StepOutcome>, item: &&str| {
621                if *item == "skip" {
622                    None
623                } else {
624                    Some(AgentStepSpec::new("s", "explore", "d", *item))
625                }
626            })];
627        let out = execute_pipeline(exec, vec!["skip", "run"], stages, None).await;
628        assert!(
629            out[0].is_none(),
630            "a first-stage None yields a None outcome (chain never started)"
631        );
632        assert!(out[1].as_ref().unwrap().success);
633    }
634
635    fn cached(task_id: &str, agent: &str, output: &str) -> StepOutcome {
636        StepOutcome {
637            task_id: task_id.to_string(),
638            session_id: format!("task-run-{task_id}"),
639            agent: agent.to_string(),
640            output: output.to_string(),
641            success: true,
642            structured: None,
643        }
644    }
645
646    #[tokio::test]
647    async fn resumable_reruns_all_when_checkpoint_load_errors() {
648        use crate::store::MemorySessionStore;
649        let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
650
651        // A checkpoint written by a *newer*, incompatible schema version: the
652        // store rejects it on load. The resumable combinator must treat that as
653        // no prior progress and re-run everything (fail-safe), not panic or
654        // silently resume from state it can't read.
655        let mut done = std::collections::HashMap::new();
656        done.insert("a".to_string(), cached("a", "explore", "old"));
657        let mut cp = WorkflowCheckpoint::from_completed("wf-err", &done, 1);
658        cp.schema_version = crate::orchestration::WORKFLOW_CHECKPOINT_SCHEMA_VERSION + 1;
659        store.save_workflow_checkpoint("wf-err", &cp).await.unwrap();
660
661        let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
662        let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
663            ran: Arc::clone(&ran),
664        });
665        let specs = vec![
666            AgentStepSpec::new("a", "explore", "d", "pa"),
667            AgentStepSpec::new("b", "review", "d", "pb"),
668        ];
669        let out =
670            execute_steps_parallel_resumable(exec, specs, "wf-err", Arc::clone(&store), None).await;
671
672        let mut ran_ids = ran.lock().await.clone();
673        ran_ids.sort();
674        assert_eq!(
675            ran_ids,
676            vec!["a".to_string(), "b".to_string()],
677            "an unreadable (future-version) checkpoint is ignored → all steps re-run"
678        );
679        assert_eq!(out.len(), 2);
680        assert!(out.iter().all(|o| o.success));
681    }
682
683    #[tokio::test]
684    async fn resumable_ignores_checkpointed_steps_absent_from_new_specs() {
685        use crate::store::MemorySessionStore;
686        let store: Arc<dyn SessionStore> = Arc::new(MemorySessionStore::new());
687
688        // Prior checkpoint completed {a, b}; the new run drops "a", reorders,
689        // and adds "c". Output follows the NEW specs; "b" is reused; the stale
690        // "a" simply doesn't appear; only "c" actually runs.
691        let mut done = std::collections::HashMap::new();
692        done.insert("a".to_string(), cached("a", "explore", "cached-a"));
693        done.insert("b".to_string(), cached("b", "review", "cached-b"));
694        store
695            .save_workflow_checkpoint(
696                "wf-x",
697                &WorkflowCheckpoint::from_completed("wf-x", &done, 1),
698            )
699            .await
700            .unwrap();
701
702        let ran = Arc::new(tokio::sync::Mutex::new(Vec::new()));
703        let exec: Arc<dyn AgentExecutor> = Arc::new(RecordingExecutor {
704            ran: Arc::clone(&ran),
705        });
706        let specs = vec![
707            AgentStepSpec::new("b", "review", "d", "pb"),
708            AgentStepSpec::new("c", "plan", "d", "pc"),
709        ];
710        let out =
711            execute_steps_parallel_resumable(exec, specs, "wf-x", Arc::clone(&store), None).await;
712
713        assert_eq!(
714            *ran.lock().await,
715            vec!["c".to_string()],
716            "cached b reused, stale a dropped, only new c runs"
717        );
718        assert_eq!(out.len(), 2);
719        assert_eq!(out[0].task_id, "b");
720        assert_eq!(out[0].output, "cached-b");
721        assert_eq!(out[1].task_id, "c");
722        assert!(out.iter().all(|o| o.success));
723    }
724
725    #[tokio::test]
726    async fn loop_stops_when_predicate_says_stop() {
727        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
728        let mut rounds = 0;
729        let out = crate::orchestration::execute_loop(
730            exec,
731            vec![AgentStepSpec::new("r0", "explore", "d", "p")],
732            10,
733            None,
734            |outcomes| {
735                rounds += 1;
736                // Continue twice, then stop on the third round.
737                if rounds < 3 {
738                    LoopDecision::Continue(vec![AgentStepSpec::new(
739                        format!("r{rounds}"),
740                        "explore",
741                        "d",
742                        outcomes[0].output.clone(),
743                    )])
744                } else {
745                    LoopDecision::Stop
746                }
747            },
748        )
749        .await;
750        assert_eq!(rounds, 3, "predicate saw exactly three rounds");
751        assert_eq!(out.len(), 1, "returns the last round's outcomes");
752        assert!(out[0].success);
753    }
754
755    #[tokio::test]
756    async fn loop_is_hard_capped_by_max_iterations() {
757        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
758        let mut rounds = 0;
759        // A predicate that NEVER stops — only max_iterations terminates it.
760        let _ = crate::orchestration::execute_loop(
761            exec,
762            vec![AgentStepSpec::new("r", "explore", "d", "p")],
763            3,
764            None,
765            |_outcomes| {
766                rounds += 1;
767                LoopDecision::Continue(vec![AgentStepSpec::new("r", "explore", "d", "p")])
768            },
769        )
770        .await;
771        assert_eq!(
772            rounds, 3,
773            "max_iterations is a hard cap on a never-stopping predicate"
774        );
775    }
776
777    #[tokio::test]
778    async fn loop_with_empty_initial_runs_nothing() {
779        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
780        let mut called = false;
781        let out = crate::orchestration::execute_loop(exec, vec![], 5, None, |_| {
782            called = true;
783            LoopDecision::Stop
784        })
785        .await;
786        assert!(out.is_empty());
787        assert!(!called, "predicate is not invoked when there is no work");
788    }
789
790    #[tokio::test]
791    async fn loop_stops_when_predicate_requests_no_further_specs() {
792        // Continue with an empty spec set ends the loop (no work left = dry).
793        let exec: Arc<dyn AgentExecutor> = Arc::new(EchoExecutor::new());
794        let mut rounds = 0;
795        let out = crate::orchestration::execute_loop(
796            exec,
797            vec![AgentStepSpec::new("r0", "explore", "d", "p")],
798            10,
799            None,
800            |_| {
801                rounds += 1;
802                LoopDecision::Continue(vec![]) // nothing more to do → loop ends
803            },
804        )
805        .await;
806        assert_eq!(rounds, 1);
807        assert_eq!(out.len(), 1, "the completed round is still returned");
808    }
809}