Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::storage::{FilesystemBackend, StorageManager};
18use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
19use crate::workflow::{parser, validate};
20
21/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
22const MAX_LOOP_ITERATIONS: usize = 100;
23
24/// Execute a workflow file (`.zwf` or `.zwfz`).
25///
26/// Parses the workflow, validates it, resolves the step DAG, and executes
27/// each step by delegating to `zag`. The optional `user_prompt` is injected
28/// as additional context into every step's prompt.
29///
30/// Resource advertisement (the `<resources>` block prepended to each step's
31/// system prompt) is enabled by default; pass `disable_resources = true` to
32/// opt out, e.g. via `zig run --no-resources`.
33///
34/// Memory injection (the `<memory>` block) is similarly enabled by default;
35/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
36pub fn run_workflow(
37    workflow_path: &str,
38    user_prompt: Option<&str>,
39    disable_resources: bool,
40    disable_memory: bool,
41) -> Result<(), ZigError> {
42    check_zag()?;
43
44    let path = resolve_workflow_path(workflow_path)?;
45    let (workflow, source) = parser::parse_workflow(&path)?;
46
47    if let Err(errors) = validate::validate(&workflow) {
48        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
49        return Err(ZigError::Validation(msgs.join("; ")));
50    }
51
52    execute(
53        &workflow,
54        &path,
55        user_prompt,
56        source.dir(),
57        disable_resources,
58        disable_memory,
59    )
60}
61
62/// Verify that `zag` is installed and available on PATH.
63pub(crate) fn check_zag() -> Result<(), ZigError> {
64    let zag_available = Command::new("zag")
65        .arg("--version")
66        .output()
67        .is_ok_and(|o| o.status.success());
68
69    if !zag_available {
70        return Err(ZigError::Zag(
71            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
72        ));
73    }
74    Ok(())
75}
76
77/// Resolve a workflow argument to an actual file path.
78///
79/// Tries in order:
80/// 1. Literal path as given
81/// 2. With `.zwf` extension appended
82/// 3. With `.zwfz` extension appended
83/// 4. Under local project `.zig/workflows/` directory
84/// 5. Under local project `.zig/workflows/` with `.zwf` appended
85/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
86/// 7. Under global `~/.zig/workflows/` directory
87/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
88/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
89pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
90    let mut candidates = vec![
91        PathBuf::from(workflow),
92        PathBuf::from(format!("{workflow}.zwf")),
93        PathBuf::from(format!("{workflow}.zwfz")),
94    ];
95
96    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
97        candidates.push(local_dir.join(workflow));
98        candidates.push(local_dir.join(format!("{workflow}.zwf")));
99        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
100    }
101
102    if let Some(global_dir) = crate::paths::global_workflows_dir() {
103        candidates.push(global_dir.join(workflow));
104        candidates.push(global_dir.join(format!("{workflow}.zwf")));
105        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
106    }
107
108    for candidate in &candidates {
109        if candidate.exists() {
110            return Ok(candidate.clone());
111        }
112    }
113
114    Err(ZigError::Io(format!(
115        "workflow not found: '{workflow}' (tried: {})",
116        candidates
117            .iter()
118            .map(|p| p.display().to_string())
119            .collect::<Vec<_>>()
120            .join(", ")
121    )))
122}
123
124/// Compute a topological ordering of steps grouped into tiers.
125///
126/// Each tier contains steps whose dependencies are all in earlier tiers,
127/// meaning steps within a tier can (in principle) run in parallel.
128/// Uses Kahn's algorithm.
129fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
130    let step_index: HashMap<&str, usize> = steps
131        .iter()
132        .enumerate()
133        .map(|(i, s)| (s.name.as_str(), i))
134        .collect();
135
136    let mut in_degree = vec![0usize; steps.len()];
137    for (i, step) in steps.iter().enumerate() {
138        for dep in &step.depends_on {
139            if step_index.contains_key(dep.as_str()) {
140                in_degree[i] += 1;
141            }
142        }
143    }
144
145    let mut tiers = Vec::new();
146    let mut remaining = in_degree.clone();
147    let mut completed: Vec<bool> = vec![false; steps.len()];
148
149    loop {
150        let tier: Vec<usize> = (0..steps.len())
151            .filter(|&i| !completed[i] && remaining[i] == 0)
152            .collect();
153
154        if tier.is_empty() {
155            break;
156        }
157
158        for &i in &tier {
159            completed[i] = true;
160        }
161
162        // Decrement in-degrees for dependents of this tier
163        for &i in &tier {
164            for (j, step) in steps.iter().enumerate() {
165                if !completed[j] && step.depends_on.contains(&steps[i].name) {
166                    remaining[j] -= 1;
167                }
168            }
169        }
170
171        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
172    }
173
174    let completed_count: usize = completed.iter().filter(|&&c| c).count();
175    if completed_count != steps.len() {
176        return Err(ZigError::Execution(
177            "could not resolve all steps — possible undetected cycle".into(),
178        ));
179    }
180
181    Ok(tiers)
182}
183
184/// Replace `${var_name}` references in a template with values from the variable map.
185///
186/// Supports dotted paths like `${result.score}` — the root variable name is
187/// looked up, and if its value is valid JSON, the nested path is traversed.
188/// Unknown variables are left as-is.
189fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
190    let mut result = String::with_capacity(template.len());
191    let mut rest = template;
192
193    while let Some(start) = rest.find("${") {
194        result.push_str(&rest[..start]);
195        let after_start = &rest[start + 2..];
196
197        if let Some(end) = after_start.find('}') {
198            let var_expr = &after_start[..end];
199            let mut parts = var_expr.splitn(2, '.');
200            let root = parts.next().unwrap_or(var_expr);
201
202            if let Some(value) = vars.get(root) {
203                if let Some(path) = parts.next() {
204                    // Try to navigate a JSON path
205                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
206                        let resolved = json_path_lookup(&json, path);
207                        result.push_str(&resolved);
208                    } else {
209                        result.push_str(value);
210                    }
211                } else {
212                    result.push_str(value);
213                }
214            } else {
215                // Unknown variable — leave as-is
216                result.push_str(&rest[start..start + 2 + end + 1]);
217            }
218
219            rest = &after_start[end + 1..];
220        } else {
221            result.push_str(&rest[start..]);
222            rest = "";
223        }
224    }
225
226    result.push_str(rest);
227    result
228}
229
230/// Look up a dotted path in a JSON value (e.g., "nested.field").
231fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
232    let mut current = value;
233    for key in path.split('.') {
234        match current.get(key) {
235            Some(v) => current = v,
236            None => return format!("${{?.{path}}}"),
237        }
238    }
239    match current {
240        serde_json::Value::String(s) => s.clone(),
241        other => other.to_string(),
242    }
243}
244
245/// Resolve the effective system prompt for a step, with any advertised
246/// resources prepended as a `<resources>` block.
247///
248/// Resolution order:
249/// 1. If `step.system_prompt` is set, use it (with variable substitution).
250/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
251///    look it up in the roles table, and use the role's system prompt
252///    (loaded from file if `system_prompt_file` is set).
253/// 3. Otherwise the base prompt is empty.
254///
255/// Resources from all configured tiers (global shared, global per-workflow,
256/// project cwd, inline workflow, inline step) are then collected by the
257/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
258/// that is prepended to the base prompt. If both the resources set and the
259/// base prompt are empty, returns `None` — keeping the current behavior when
260/// nothing is configured.
261#[allow(clippy::too_many_arguments)]
262fn resolve_role_system_prompt(
263    step: &Step,
264    roles: &HashMap<String, Role>,
265    resources: &ResourceCollector,
266    memory: &MemoryCollector,
267    storage: &StorageManager,
268    vars: &HashMap<String, String>,
269    workflow_dir: &Path,
270    workflow_name: &str,
271) -> Result<Option<String>, ZigError> {
272    // Resolve the base system prompt (may be empty if neither is set).
273    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
274        Some(substitute_vars(sp, vars))
275    } else if let Some(ref role_ref) = step.role {
276        let resolved_name = substitute_vars(role_ref, vars);
277        let role = roles.get(&resolved_name).ok_or_else(|| {
278            ZigError::Execution(format!(
279                "step '{}' references role '{}' which does not exist",
280                step.name, resolved_name
281            ))
282        })?;
283
284        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
285            let full_path = workflow_dir.join(expand_path(file_path));
286            Some(std::fs::read_to_string(&full_path).map_err(|e| {
287                ZigError::Execution(format!(
288                    "failed to read system_prompt_file '{}' for role '{}': {e}",
289                    full_path.display(),
290                    resolved_name
291                ))
292            })?)
293        } else {
294            role.system_prompt.clone()
295        };
296
297        raw_prompt.map(|p| substitute_vars(&p, vars))
298    } else {
299        None
300    };
301
302    // Collect and render resources.
303    let set = resources.collect_for_step(&step.resources)?;
304    let resource_block = render_system_block(&set);
305
306    // Collect and render memory.
307    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
308    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
309
310    // Render storage block for this step's scope.
311    let storage_block = match storage.render_block(step.storage.as_deref())? {
312        Some(mut s) => {
313            s.push('\n');
314            s
315        }
316        None => String::new(),
317    };
318
319    let prefix = format!("{resource_block}{memory_block}{storage_block}");
320
321    match (prefix.is_empty(), base_prompt) {
322        (true, None) => Ok(None),
323        (true, Some(p)) => Ok(Some(p)),
324        (false, None) => Ok(Some(prefix.trim_end().to_string())),
325        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
326    }
327}
328
329/// Load file-backed default values for variables.
330///
331/// For each variable with `default_file` set (and no `default`), reads the
332/// file contents relative to `workflow_dir` and inserts them into the vars map.
333fn load_file_defaults(
334    vars: &mut HashMap<String, String>,
335    declarations: &HashMap<String, crate::workflow::model::Variable>,
336    workflow_dir: &Path,
337) -> Result<(), ZigError> {
338    for (name, decl) in declarations {
339        if decl.default.is_none() {
340            if let Some(ref file_path) = decl.default_file {
341                let full_path = workflow_dir.join(expand_path(file_path));
342                let content = std::fs::read_to_string(&full_path).map_err(|e| {
343                    ZigError::Execution(format!(
344                        "failed to read default_file '{}' for variable '{name}': {e}",
345                        full_path.display()
346                    ))
347                })?;
348                vars.insert(name.clone(), content);
349            }
350        }
351    }
352    Ok(())
353}
354
355/// Evaluate a simple condition expression against the current variable state.
356///
357/// Supports:
358/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
359/// - String equality: `status == "done"`, `status != "pending"`
360/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
361fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
362    let condition = condition.trim();
363
364    // Try comparison operators (ordered by length to match `<=` before `<`)
365    let operators = ["<=", ">=", "!=", "==", "<", ">"];
366    for op in &operators {
367        if let Some(pos) = condition.find(op) {
368            let lhs = resolve_operand(condition[..pos].trim(), vars);
369            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
370            return Ok(compare(&lhs, &rhs, op));
371        }
372    }
373
374    // Truthy check: single variable name
375    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
376    Ok(is_truthy(value))
377}
378
379/// Resolve a condition operand to its string value.
380/// - String literals ("done") → done
381/// - Variable names → looked up in vars
382/// - Numeric literals → left as-is
383fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
384    // Strip surrounding quotes for string literals
385    if (token.starts_with('"') && token.ends_with('"'))
386        || (token.starts_with('\'') && token.ends_with('\''))
387    {
388        return token[1..token.len() - 1].to_string();
389    }
390    // Try variable lookup
391    if let Some(val) = vars.get(token) {
392        return val.clone();
393    }
394    // Return as-is (numeric literal or unknown)
395    token.to_string()
396}
397
398/// Compare two string operands with the given operator.
399/// Attempts numeric comparison first, falls back to lexicographic.
400fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
401    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
402        return match op {
403            "==" => (l - r).abs() < f64::EPSILON,
404            "!=" => (l - r).abs() >= f64::EPSILON,
405            "<" => l < r,
406            ">" => l > r,
407            "<=" => l <= r,
408            ">=" => l >= r,
409            _ => false,
410        };
411    }
412    match op {
413        "==" => lhs == rhs,
414        "!=" => lhs != rhs,
415        "<" => lhs < rhs,
416        ">" => lhs > rhs,
417        "<=" => lhs <= rhs,
418        ">=" => lhs >= rhs,
419        _ => false,
420    }
421}
422
423/// Check if a string value is truthy.
424fn is_truthy(value: &str) -> bool {
425    !value.is_empty() && value != "false" && value != "0"
426}
427
428/// Build the final prompt for a step, incorporating variable substitution,
429/// dependency outputs, and the user's context prompt.
430fn render_step_prompt(
431    step: &Step,
432    vars: &HashMap<String, String>,
433    user_prompt: Option<&str>,
434    dependency_outputs: &HashMap<String, String>,
435) -> String {
436    let mut prompt = String::new();
437
438    // Prepend user context if provided
439    if let Some(ctx) = user_prompt {
440        prompt.push_str(&format!("User context: {ctx}\n\n"));
441    }
442
443    // Inject dependency outputs if requested
444    if step.inject_context {
445        for dep in &step.depends_on {
446            if let Some(output) = dependency_outputs.get(dep) {
447                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
448            }
449        }
450    }
451
452    // Append the step's prompt with variable substitution
453    prompt.push_str(&substitute_vars(&step.prompt, vars));
454
455    prompt
456}
457
458/// Build the argument list for a zag invocation.
459///
460/// Separated from `execute_step` to allow unit testing of flag logic
461/// without a real `zag` binary. The optional `model_override` is used
462/// during retries when `retry_model` escalates to a different model.
463///
464/// The subcommand is determined by `step.command`:
465/// - `None` → `zag run <prompt>` (default)
466/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
467/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
468/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
469/// - `Collect` → `zag collect <session-ids...>`
470/// - `Summary` → `zag summary <session-ids...>`
471#[allow(clippy::too_many_arguments)]
472fn build_zag_args(
473    step: &Step,
474    prompt: &str,
475    workflow_name: &str,
476    model_override: Option<&str>,
477    rendered_system_prompt: Option<&str>,
478    workflow_provider: Option<&str>,
479    workflow_model: Option<&str>,
480    extra_add_dirs: &[std::path::PathBuf],
481) -> Vec<String> {
482    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
483
484    // Build command-specific prefix and determine if agent args apply
485    let (mut args, accepts_agent_args) = match &step.command {
486        None => (vec!["run".to_string(), prompt.to_string()], true),
487        Some(StepCommand::Review) => {
488            let mut a = vec!["review".to_string()];
489            if !prompt.is_empty() {
490                a.push(prompt.to_string());
491            }
492            if step.uncommitted {
493                a.push("--uncommitted".into());
494            }
495            if let Some(base) = &step.base {
496                a.extend(["--base".into(), base.clone()]);
497            }
498            if let Some(commit) = &step.commit {
499                a.extend(["--commit".into(), commit.clone()]);
500            }
501            if let Some(title) = &step.title {
502                a.extend(["--title".into(), title.clone()]);
503            }
504            (a, true)
505        }
506        Some(StepCommand::Plan) => {
507            let mut a = vec!["plan".to_string(), prompt.to_string()];
508            if let Some(output) = &step.plan_output {
509                a.extend(["-o".into(), expand_path(output)]);
510            }
511            if let Some(instructions) = &step.instructions {
512                a.extend(["--instructions".into(), instructions.clone()]);
513            }
514            (a, true)
515        }
516        Some(StepCommand::Pipe) => {
517            let mut a = vec!["pipe".to_string()];
518            for dep in &step.depends_on {
519                a.push(session_name(dep));
520            }
521            a.push("--".into());
522            a.push(prompt.to_string());
523            (a, true)
524        }
525        Some(StepCommand::Collect) => {
526            let mut a = vec!["collect".to_string()];
527            for dep in &step.depends_on {
528                a.push(session_name(dep));
529            }
530            (a, false)
531        }
532        Some(StepCommand::Summary) => {
533            let mut a = vec!["summary".to_string()];
534            for dep in &step.depends_on {
535                a.push(session_name(dep));
536            }
537            (a, false)
538        }
539    };
540
541    // Agent args (provider, model, prompts, output, etc.) only apply to
542    // commands that launch an agent: run, review, plan, pipe.
543    if accepts_agent_args {
544        let effective_provider = step.provider.as_deref().or(workflow_provider);
545        if let Some(provider) = effective_provider {
546            args.extend(["--provider".into(), provider.to_string()]);
547        }
548
549        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
550        if let Some(model) = effective_model {
551            args.extend(["--model".into(), model.to_string()]);
552        }
553
554        if let Some(sp) = rendered_system_prompt {
555            args.extend(["--system-prompt".into(), sp.to_string()]);
556        }
557        if let Some(max_turns) = step.max_turns {
558            args.extend(["--max-turns".into(), max_turns.to_string()]);
559        }
560
561        // Output format: explicit format overrides the json bool
562        if let Some(output) = &step.output {
563            args.extend(["-o".into(), output.clone()]);
564        } else if step.json {
565            args.push("--json".into());
566        }
567        if let Some(schema) = &step.json_schema {
568            args.extend(["--json-schema".into(), schema.clone()]);
569        }
570
571        if let Some(mcp_config) = &step.mcp_config {
572            args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
573        }
574
575        // Execution environment
576        if step.auto_approve {
577            args.push("--auto-approve".into());
578        }
579        if let Some(root) = &step.root {
580            args.extend(["--root".into(), expand_path(root)]);
581        }
582        for dir in &step.add_dirs {
583            args.extend(["--add-dir".into(), expand_path(dir)]);
584        }
585        for dir in extra_add_dirs {
586            args.extend(["--add-dir".into(), dir.display().to_string()]);
587        }
588        for (key, value) in &step.env {
589            args.extend(["--env".into(), format!("{key}={value}")]);
590        }
591        for file in &step.files {
592            args.extend(["--file".into(), expand_path(file)]);
593        }
594
595        // Context injection
596        for ctx in &step.context {
597            args.extend(["--context".into(), ctx.clone()]);
598        }
599        if let Some(plan) = &step.plan {
600            args.extend(["--plan".into(), expand_path(plan)]);
601        }
602
603        // Isolation
604        if step.worktree {
605            args.push("--worktree".into());
606        }
607        if let Some(sandbox) = &step.sandbox {
608            args.extend(["--sandbox".into(), sandbox.clone()]);
609        }
610    }
611
612    // Session metadata applies to all commands
613    let name = session_name(&step.name);
614    args.extend(["--name".into(), name]);
615
616    if !step.description.is_empty() {
617        args.extend(["--description".into(), step.description.clone()]);
618    }
619
620    args.extend(["--tag".into(), "zig-workflow".into()]);
621    for tag in &step.tags {
622        args.extend(["--tag".into(), tag.clone()]);
623    }
624
625    if let Some(timeout) = &step.timeout {
626        args.extend(["--timeout".into(), timeout.clone()]);
627    }
628
629    args
630}
631
632/// Spawn `zag` and stream its stdout/stderr live to our stderr while
633/// also accumulating stdout into a buffer for `saves` extraction.
634///
635/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
636/// — used to disambiguate output from steps running in parallel.
637fn run_zag_streaming(
638    args: &[String],
639    step_name: &str,
640    prefix: Option<&str>,
641    session: Option<&Arc<SessionWriter>>,
642) -> Result<(std::process::ExitStatus, String), ZigError> {
643    let mut cmd = Command::new("zag");
644    cmd.args(args)
645        .stdout(std::process::Stdio::piped())
646        .stderr(std::process::Stdio::piped());
647
648    let mut child = cmd
649        .spawn()
650        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
651
652    let stdout = child.stdout.take().expect("stdout was piped");
653    let stderr = child.stderr.take().expect("stderr was piped");
654
655    let buffer = Arc::new(Mutex::new(String::new()));
656    let buffer_clone = Arc::clone(&buffer);
657    let prefix_stdout = prefix.map(String::from);
658    let prefix_stderr = prefix.map(String::from);
659    let session_stdout = session.cloned();
660    let session_stderr = session.cloned();
661    let step_name_stdout = step_name.to_string();
662    let step_name_stderr = step_name.to_string();
663
664    let stdout_thread = thread::spawn(move || {
665        let reader = BufReader::new(stdout);
666        let stderr_handle = std::io::stderr();
667        for line in reader.lines().map_while(Result::ok) {
668            if let Ok(mut buf) = buffer_clone.lock() {
669                buf.push_str(&line);
670                buf.push('\n');
671            }
672            if let Some(w) = &session_stdout {
673                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
674            }
675            let mut h = stderr_handle.lock();
676            let _ = match &prefix_stdout {
677                Some(p) => writeln!(h, "[{p}] {line}"),
678                None => writeln!(h, "{line}"),
679            };
680        }
681    });
682
683    let stderr_thread = thread::spawn(move || {
684        let reader = BufReader::new(stderr);
685        let stderr_handle = std::io::stderr();
686        for line in reader.lines().map_while(Result::ok) {
687            if let Some(w) = &session_stderr {
688                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
689            }
690            let mut h = stderr_handle.lock();
691            let _ = match &prefix_stderr {
692                Some(p) => writeln!(h, "[{p}] {line}"),
693                None => writeln!(h, "{line}"),
694            };
695        }
696    });
697
698    let status = child
699        .wait()
700        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
701
702    let _ = stdout_thread.join();
703    let _ = stderr_thread.join();
704
705    let captured = Arc::try_unwrap(buffer)
706        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
707        .into_inner()
708        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
709
710    Ok((status, captured))
711}
712
713/// Execute a single step by invoking `zag`, streaming its output live.
714///
715/// Returns the captured stdout from zag. The optional `model_override`
716/// is used during retries to escalate to a different model. The optional
717/// `prefix` tags streamed lines with the step name (used for parallel runs).
718#[allow(clippy::too_many_arguments)]
719fn execute_step(
720    step: &Step,
721    prompt: &str,
722    workflow_name: &str,
723    model_override: Option<&str>,
724    prefix: Option<&str>,
725    session: Option<&Arc<SessionWriter>>,
726    rendered_system_prompt: Option<&str>,
727    workflow_provider: Option<&str>,
728    workflow_model: Option<&str>,
729    extra_add_dirs: &[std::path::PathBuf],
730) -> Result<String, ZigError> {
731    let args = build_zag_args(
732        step,
733        prompt,
734        workflow_name,
735        model_override,
736        rendered_system_prompt,
737        workflow_provider,
738        workflow_model,
739        extra_add_dirs,
740    );
741    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
742
743    if !status.success() {
744        return Err(ZigError::Execution(format!(
745            "step '{}' failed (exit {})",
746            step.name, status,
747        )));
748    }
749
750    Ok(stdout)
751}
752
753/// Run a step with retry logic, returning its captured stdout on success.
754///
755/// Extracted so both sequential and parallel execution paths share the
756/// same retry / model-escalation behavior.
757#[allow(clippy::too_many_arguments)]
758fn run_step_attempts(
759    step: &Step,
760    prompt: &str,
761    workflow_name: &str,
762    prefix: Option<&str>,
763    session: Option<&Arc<SessionWriter>>,
764    rendered_system_prompt: Option<&str>,
765    workflow_provider: Option<&str>,
766    workflow_model: Option<&str>,
767    extra_add_dirs: &[std::path::PathBuf],
768) -> Result<String, ZigError> {
769    let mut attempts = 0;
770    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
771        step.max_retries.unwrap_or(1) + 1
772    } else {
773        1
774    };
775
776    loop {
777        attempts += 1;
778        let model_override = if attempts > 1 {
779            step.retry_model.as_deref()
780        } else {
781            None
782        };
783        match execute_step(
784            step,
785            prompt,
786            workflow_name,
787            model_override,
788            prefix,
789            session,
790            rendered_system_prompt,
791            workflow_provider,
792            workflow_model,
793            extra_add_dirs,
794        ) {
795            Ok(output) => return Ok(output),
796            Err(e) => {
797                if let Some(w) = session {
798                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
799                }
800                if attempts < max_attempts {
801                    eprintln!(
802                        "    retry {}/{} for step '{}'",
803                        attempts,
804                        max_attempts - 1,
805                        step.name
806                    );
807                    continue;
808                }
809                return Err(e);
810            }
811        }
812    }
813}
814
815/// Extract variable values from step output using `saves` selectors.
816///
817/// Selectors:
818/// - `"$"` — the entire output
819/// - `"$.field"` — a top-level JSON field
820/// - `"$.nested.field"` — a nested JSON field
821fn extract_saves(
822    output: &str,
823    saves: &HashMap<String, String>,
824) -> Result<HashMap<String, String>, ZigError> {
825    let mut extracted = HashMap::new();
826
827    for (var_name, selector) in saves {
828        let value = if selector == "$" {
829            output.trim().to_string()
830        } else if let Some(path) = selector.strip_prefix("$.") {
831            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
832                ZigError::Execution(format!(
833                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
834                ))
835            })?;
836            json_path_lookup(&json, path)
837        } else {
838            output.trim().to_string()
839        };
840
841        extracted.insert(var_name.clone(), value);
842    }
843
844    Ok(extracted)
845}
846
847/// Partition a tier of steps into sequential steps and race groups.
848///
849/// Steps without a `race_group` are returned as sequential. Steps sharing
850/// the same `race_group` value are grouped together for parallel execution.
851fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
852    let mut sequential = Vec::new();
853    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
854
855    for step in tier {
856        if let Some(group) = &step.race_group {
857            race_groups.entry(group.clone()).or_default().push(step);
858        } else {
859            sequential.push(*step);
860        }
861    }
862
863    (sequential, race_groups)
864}
865
866/// Spawn a step as a child process without waiting for it to finish.
867fn spawn_step(
868    step: &Step,
869    prompt: &str,
870    workflow_name: &str,
871    rendered_system_prompt: Option<&str>,
872    workflow_provider: Option<&str>,
873    workflow_model: Option<&str>,
874    extra_add_dirs: &[std::path::PathBuf],
875) -> Result<std::process::Child, ZigError> {
876    let args = build_zag_args(
877        step,
878        prompt,
879        workflow_name,
880        None,
881        rendered_system_prompt,
882        workflow_provider,
883        workflow_model,
884        extra_add_dirs,
885    );
886    let mut cmd = Command::new("zag");
887    cmd.args(&args)
888        .stdout(std::process::Stdio::piped())
889        .stderr(std::process::Stdio::piped());
890
891    cmd.spawn()
892        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
893}
894
895/// Execute a race group: run all steps in parallel, return the first winner.
896///
897/// When one step finishes successfully, all remaining steps are killed.
898/// Returns the winning step's name and its stdout output.
899#[allow(clippy::too_many_arguments)]
900fn execute_race_group(
901    steps: &[&Step],
902    prompts: &HashMap<String, String>,
903    system_prompts: &HashMap<String, String>,
904    workflow_name: &str,
905    tier_index: usize,
906    session: Option<&Arc<SessionWriter>>,
907    workflow_provider: Option<&str>,
908    workflow_model: Option<&str>,
909    storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
910) -> Result<(String, String), ZigError> {
911    if let Some(w) = session {
912        for step in steps {
913            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
914            let preview = prompts
915                .get(&step.name)
916                .map(|p| prompt_preview(p))
917                .unwrap_or_default();
918            let _ = w.step_started(
919                &step.name,
920                tier_index,
921                &zag_session_id,
922                zag_command_name(&step.command),
923                step.model.as_deref(),
924                &preview,
925            );
926        }
927    }
928    let race_started = Instant::now();
929    let mut children: Vec<(String, std::process::Child)> = Vec::new();
930
931    for step in steps {
932        let prompt = prompts.get(&step.name).ok_or_else(|| {
933            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
934        })?;
935        eprintln!("  racing step '{}'...", step.name);
936        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
937        let empty: Vec<std::path::PathBuf> = Vec::new();
938        let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty);
939        let child = spawn_step(
940            step,
941            prompt,
942            workflow_name,
943            rendered_sp,
944            workflow_provider,
945            workflow_model,
946            extra_dirs,
947        )?;
948        children.push((step.name.clone(), child));
949    }
950
951    // Poll until one finishes
952    loop {
953        for i in 0..children.len() {
954            let status = children[i]
955                .1
956                .try_wait()
957                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
958
959            if let Some(exit_status) = status {
960                let (winner_name, winner_child) = children.remove(i);
961
962                // Kill remaining children
963                for (name, mut child) in children {
964                    eprintln!("  cancelling step '{name}' (race lost)");
965                    let _ = child.kill();
966                    let _ = child.wait();
967                }
968
969                let elapsed = race_started.elapsed().as_millis() as u64;
970                if !exit_status.success() {
971                    let stderr = winner_child
972                        .stderr
973                        .map(|mut s| {
974                            let mut buf = String::new();
975                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
976                            buf
977                        })
978                        .unwrap_or_default();
979                    let err_msg = format!(
980                        "race winner '{}' failed (exit {}): {}",
981                        winner_name,
982                        exit_status,
983                        stderr.trim()
984                    );
985                    if let Some(w) = session {
986                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
987                    }
988                    return Err(ZigError::Execution(err_msg));
989                }
990
991                let stdout = winner_child
992                    .stdout
993                    .map(|mut s| {
994                        let mut buf = String::new();
995                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
996                        buf
997                    })
998                    .unwrap_or_default();
999
1000                eprintln!("  race won by '{winner_name}'");
1001                if let Some(w) = session {
1002                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1003                }
1004                return Ok((winner_name, stdout));
1005            }
1006        }
1007
1008        std::thread::sleep(Duration::from_millis(100));
1009    }
1010}
1011
1012/// Execute a single sequential step with retry logic, saves, and next-jump handling.
1013#[allow(clippy::too_many_arguments)]
1014fn execute_sequential_step(
1015    step: &Step,
1016    vars: &mut HashMap<String, String>,
1017    user_prompt: Option<&str>,
1018    step_outputs: &mut HashMap<String, String>,
1019    workflow_name: &str,
1020    pending_next: &mut Option<String>,
1021    tier_index: usize,
1022    session: Option<&Arc<SessionWriter>>,
1023    roles: &HashMap<String, Role>,
1024    resources: &ResourceCollector,
1025    memory: &MemoryCollector,
1026    storage: &StorageManager,
1027    workflow_dir: &Path,
1028    workflow_provider: Option<&str>,
1029    workflow_model: Option<&str>,
1030) -> Result<(), ZigError> {
1031    if let Some(condition) = &step.condition {
1032        if !evaluate_condition(condition, vars)? {
1033            eprintln!(
1034                "  skipping '{}' (condition not met: {condition})",
1035                step.name
1036            );
1037            if let Some(w) = session {
1038                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1039            }
1040            return Ok(());
1041        }
1042    }
1043
1044    eprintln!("  running step '{}'...", step.name);
1045
1046    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1047    let rendered_sp = resolve_role_system_prompt(
1048        step,
1049        roles,
1050        resources,
1051        memory,
1052        storage,
1053        vars,
1054        workflow_dir,
1055        workflow_name,
1056    )?;
1057    let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1058    if let Some(w) = session {
1059        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1060        let _ = w.step_started(
1061            &step.name,
1062            tier_index,
1063            &zag_session_id,
1064            zag_command_name(&step.command),
1065            step.model.as_deref(),
1066            &prompt_preview(&prompt),
1067        );
1068    }
1069    let started = Instant::now();
1070    let result = run_step_attempts(
1071        step,
1072        &prompt,
1073        workflow_name,
1074        None,
1075        session,
1076        rendered_sp.as_deref(),
1077        workflow_provider,
1078        workflow_model,
1079        &storage_dirs,
1080    );
1081
1082    match result {
1083        Ok(output) => {
1084            let mut saved_keys: Vec<String> = Vec::new();
1085            if !step.saves.is_empty() {
1086                let saved = extract_saves(&output, &step.saves)?;
1087                for (k, v) in &saved {
1088                    eprintln!("    saved {k} = {v}");
1089                    saved_keys.push(k.clone());
1090                }
1091                vars.extend(saved);
1092            }
1093
1094            step_outputs.insert(step.name.clone(), output);
1095            eprintln!("  completed '{}'", step.name);
1096            if let Some(w) = session {
1097                let _ = w.step_completed(
1098                    &step.name,
1099                    0,
1100                    started.elapsed().as_millis() as u64,
1101                    saved_keys,
1102                );
1103            }
1104
1105            if step.next.is_some() {
1106                *pending_next = step.next.clone();
1107            }
1108        }
1109        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1110            FailurePolicy::Fail => return Err(e),
1111            FailurePolicy::Continue => {
1112                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1113            }
1114            FailurePolicy::Retry => {
1115                return Err(e);
1116            }
1117        },
1118    }
1119
1120    Ok(())
1121}
1122
1123/// Run multiple independent steps in a tier concurrently.
1124///
1125/// All non-skipped steps are spawned in their own threads and we wait for
1126/// every one to finish (unlike race groups, which kill losers). Output
1127/// lines from each step are streamed live to stderr, prefixed with the
1128/// step name to disambiguate. After completion, results are processed in
1129/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1130/// remain deterministic.
1131#[allow(clippy::too_many_arguments)]
1132fn execute_parallel_tier(
1133    steps: &[&Step],
1134    vars: &mut HashMap<String, String>,
1135    user_prompt: Option<&str>,
1136    step_outputs: &mut HashMap<String, String>,
1137    workflow_name: &str,
1138    pending_next: &mut Option<String>,
1139    tier_index: usize,
1140    session: Option<&Arc<SessionWriter>>,
1141    roles: &HashMap<String, Role>,
1142    resources: &ResourceCollector,
1143    memory: &MemoryCollector,
1144    storage: &StorageManager,
1145    workflow_dir: &Path,
1146    workflow_provider: Option<&str>,
1147    workflow_model: Option<&str>,
1148) -> Result<(), ZigError> {
1149    // Evaluate conditions and render prompts up front, so threads receive
1150    // the same variable snapshot they would have under sequential execution.
1151    let mut active: Vec<&Step> = Vec::new();
1152    let mut prompts: HashMap<String, String> = HashMap::new();
1153    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1154    let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1155    for step in steps {
1156        if let Some(condition) = &step.condition {
1157            if !evaluate_condition(condition, vars)? {
1158                eprintln!(
1159                    "  skipping '{}' (condition not met: {condition})",
1160                    step.name
1161                );
1162                if let Some(w) = session {
1163                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1164                }
1165                continue;
1166            }
1167        }
1168        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1169        prompts.insert(step.name.clone(), prompt);
1170        if let Some(sp) = resolve_role_system_prompt(
1171            step,
1172            roles,
1173            resources,
1174            memory,
1175            storage,
1176            vars,
1177            workflow_dir,
1178            workflow_name,
1179        )? {
1180            rendered_sps.insert(step.name.clone(), sp);
1181        }
1182        storage_dirs_map.insert(
1183            step.name.clone(),
1184            storage.add_dirs_for_step(step.storage.as_deref()),
1185        );
1186        active.push(*step);
1187    }
1188
1189    if active.is_empty() {
1190        return Ok(());
1191    }
1192
1193    eprintln!("  running {} steps in parallel...", active.len());
1194
1195    let mut start_times: HashMap<String, Instant> = HashMap::new();
1196    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1197    for step in &active {
1198        let step_clone: Step = (*step).clone();
1199        let prompt = prompts.remove(&step.name).unwrap_or_default();
1200        let rendered_sp = rendered_sps.remove(&step.name);
1201        let workflow_name_owned = workflow_name.to_string();
1202        let name = step.name.clone();
1203        eprintln!("  starting '{name}'...");
1204        if let Some(w) = session {
1205            let zag_session_id = format!("zig-{workflow_name}-{name}");
1206            let _ = w.step_started(
1207                &name,
1208                tier_index,
1209                &zag_session_id,
1210                zag_command_name(&step.command),
1211                step.model.as_deref(),
1212                &prompt_preview(&prompt),
1213            );
1214        }
1215        start_times.insert(name.clone(), Instant::now());
1216        let session_clone = session.cloned();
1217        let wf_provider = workflow_provider.map(String::from);
1218        let wf_model = workflow_model.map(String::from);
1219        let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1220        let handle = thread::spawn(move || {
1221            let res = run_step_attempts(
1222                &step_clone,
1223                &prompt,
1224                &workflow_name_owned,
1225                Some(&name),
1226                session_clone.as_ref(),
1227                rendered_sp.as_deref(),
1228                wf_provider.as_deref(),
1229                wf_model.as_deref(),
1230                &storage_dirs,
1231            );
1232            (name, res)
1233        });
1234        handles.push(handle);
1235    }
1236
1237    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1238    for handle in handles {
1239        match handle.join() {
1240            Ok((name, res)) => {
1241                results.insert(name, res);
1242            }
1243            Err(_) => {
1244                return Err(ZigError::Execution("parallel step thread panicked".into()));
1245            }
1246        }
1247    }
1248
1249    // Process results in tier-declaration order so `next` is deterministic.
1250    let mut errors: Vec<String> = Vec::new();
1251    for step in &active {
1252        let Some(res) = results.remove(&step.name) else {
1253            continue;
1254        };
1255        let elapsed = start_times
1256            .remove(&step.name)
1257            .map(|t| t.elapsed().as_millis() as u64)
1258            .unwrap_or(0);
1259        match res {
1260            Ok(output) => {
1261                let mut saved_keys: Vec<String> = Vec::new();
1262                if !step.saves.is_empty() {
1263                    let saved = extract_saves(&output, &step.saves)?;
1264                    for (k, v) in &saved {
1265                        eprintln!("    saved {k} = {v}");
1266                        saved_keys.push(k.clone());
1267                    }
1268                    vars.extend(saved);
1269                }
1270                step_outputs.insert(step.name.clone(), output);
1271                eprintln!("  completed '{}'", step.name);
1272                if let Some(w) = session {
1273                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1274                }
1275                if step.next.is_some() && pending_next.is_none() {
1276                    *pending_next = step.next.clone();
1277                }
1278            }
1279            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1280                FailurePolicy::Continue => {
1281                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1282                }
1283                FailurePolicy::Fail | FailurePolicy::Retry => {
1284                    errors.push(format!("'{}': {e}", step.name));
1285                }
1286            },
1287        }
1288    }
1289
1290    if !errors.is_empty() {
1291        return Err(ZigError::Execution(format!(
1292            "parallel step(s) failed: {}",
1293            errors.join("; ")
1294        )));
1295    }
1296
1297    Ok(())
1298}
1299
1300/// Initialize the variable map from workflow variable definitions.
1301/// Variables with defaults are set to their default value; others are empty.
1302fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1303    let mut vars = HashMap::new();
1304    for (name, var) in &workflow.vars {
1305        let value = match &var.default {
1306            Some(toml::Value::String(s)) => s.clone(),
1307            Some(toml::Value::Integer(n)) => n.to_string(),
1308            Some(toml::Value::Float(f)) => f.to_string(),
1309            Some(toml::Value::Boolean(b)) => b.to_string(),
1310            Some(other) => other.to_string(),
1311            None => String::new(),
1312        };
1313        vars.insert(name.clone(), value);
1314    }
1315    vars
1316}
1317
1318/// Main execution loop for a validated workflow.
1319fn execute(
1320    workflow: &Workflow,
1321    workflow_path: &std::path::Path,
1322    user_prompt: Option<&str>,
1323    workflow_dir: &Path,
1324    disable_resources: bool,
1325    disable_memory: bool,
1326) -> Result<(), ZigError> {
1327    let mut vars = init_vars(workflow);
1328
1329    let resource_collector = ResourceCollector::from_env(
1330        &workflow.workflow.name,
1331        &workflow.workflow.resources,
1332        workflow_dir,
1333        disable_resources,
1334    );
1335
1336    let config = ZigConfig::load();
1337    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1338    let memory_collector = MemoryCollector::from_env(
1339        &workflow.workflow.name,
1340        workflow_memory_mode,
1341        &config,
1342        disable_memory,
1343    );
1344
1345    // Build storage manager for this run. Paths resolve against <cwd>/.zig/;
1346    // absolute paths pass through. `ensure` is called on every declared item
1347    // so step agents can trust the paths exist before they run.
1348    let storage_manager = if workflow.storage.is_empty() {
1349        StorageManager::empty()
1350    } else {
1351        let backend = FilesystemBackend::from_cwd()?;
1352        StorageManager::build(&workflow.storage, backend)?
1353    };
1354
1355    // Load file-backed variable defaults before prompt binding.
1356    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1357
1358    // Bind user prompt to the variable with `from = "prompt"`, if any.
1359    let prompt_var = workflow
1360        .vars
1361        .iter()
1362        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1363        .map(|(name, _)| name.clone());
1364
1365    if let Some(ref var_name) = prompt_var {
1366        if let Some(prompt) = user_prompt {
1367            vars.insert(var_name.clone(), prompt.to_string());
1368        }
1369    }
1370
1371    // Validate variable values against constraints before executing.
1372    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1373        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1374        return Err(ZigError::Validation(msgs.join("; ")));
1375    }
1376
1377    // When prompt is bound to a variable, don't also prepend "User context:".
1378    let effective_user_prompt = if prompt_var.is_some() {
1379        None
1380    } else {
1381        user_prompt
1382    };
1383
1384    let mut step_outputs: HashMap<String, String> = HashMap::new();
1385
1386    let wf_provider = workflow.workflow.provider.as_deref();
1387    let wf_model = workflow.workflow.model.as_deref();
1388
1389    let tiers = topological_sort(&workflow.steps)?;
1390
1391    eprintln!(
1392        "running workflow '{}' ({} steps in {} tiers)",
1393        workflow.workflow.name,
1394        workflow.steps.len(),
1395        tiers.len()
1396    );
1397
1398    // Open a zig session log for this run. Failure to open the log is
1399    // non-fatal — `zig run` should still execute even if `~/.zig` is
1400    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1401    // downstream so the writer-less path stays intact for tests.
1402    let coordinator = match SessionWriter::create(
1403        &workflow.workflow.name,
1404        &workflow_path.to_string_lossy(),
1405        user_prompt,
1406        tiers.len(),
1407    ) {
1408        Ok(writer) => {
1409            eprintln!("zig session: {}", writer.session_id());
1410            Some(SessionCoordinator::start(writer))
1411        }
1412        Err(e) => {
1413            eprintln!("warning: failed to open zig session log: {e}");
1414            None
1415        }
1416    };
1417    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1418    let session_ref = session_writer.as_ref();
1419
1420    let mut iteration = 0;
1421    let mut pending_next: Option<String> = None;
1422
1423    loop {
1424        let tiers_to_run = if let Some(ref next_step) = pending_next {
1425            // Re-run from the target step onward
1426            let remaining: Vec<Vec<&Step>> = tiers
1427                .iter()
1428                .map(|tier| {
1429                    tier.iter()
1430                        .filter(|s| s.name == *next_step)
1431                        .copied()
1432                        .collect::<Vec<_>>()
1433                })
1434                .filter(|tier| !tier.is_empty())
1435                .collect();
1436            pending_next = None;
1437            remaining
1438        } else if iteration == 0 {
1439            tiers.clone()
1440        } else {
1441            break;
1442        };
1443
1444        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1445            let (non_race, race_groups) = partition_tier(tier);
1446
1447            if let Some(w) = session_ref {
1448                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1449                let _ = w.tier_started(tier_index, names);
1450            }
1451
1452            // Independent steps in the same tier run concurrently. A single
1453            // step takes the sequential path so its output streams without
1454            // a name prefix; multiple steps go through the parallel path.
1455            if non_race.len() <= 1 {
1456                for step in &non_race {
1457                    execute_sequential_step(
1458                        step,
1459                        &mut vars,
1460                        effective_user_prompt,
1461                        &mut step_outputs,
1462                        &workflow.workflow.name,
1463                        &mut pending_next,
1464                        tier_index,
1465                        session_ref,
1466                        &workflow.roles,
1467                        &resource_collector,
1468                        &memory_collector,
1469                        &storage_manager,
1470                        workflow_dir,
1471                        wf_provider,
1472                        wf_model,
1473                    )?;
1474                }
1475            } else {
1476                execute_parallel_tier(
1477                    &non_race,
1478                    &mut vars,
1479                    effective_user_prompt,
1480                    &mut step_outputs,
1481                    &workflow.workflow.name,
1482                    &mut pending_next,
1483                    tier_index,
1484                    session_ref,
1485                    &workflow.roles,
1486                    &resource_collector,
1487                    &memory_collector,
1488                    &storage_manager,
1489                    workflow_dir,
1490                    wf_provider,
1491                    wf_model,
1492                )?;
1493            }
1494
1495            // Run race groups in parallel
1496            for (group_name, race_steps) in &race_groups {
1497                eprintln!("  starting race group '{group_name}'...");
1498
1499                // Build prompts for all racers (conditions evaluated here)
1500                let mut prompts = HashMap::new();
1501                let mut race_sps: HashMap<String, String> = HashMap::new();
1502                let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1503                    HashMap::new();
1504                let mut active_steps: Vec<&Step> = Vec::new();
1505                for step in race_steps {
1506                    if let Some(condition) = &step.condition {
1507                        if !evaluate_condition(condition, &vars)? {
1508                            eprintln!(
1509                                "  skipping '{}' (condition not met: {condition})",
1510                                step.name
1511                            );
1512                            continue;
1513                        }
1514                    }
1515                    let prompt =
1516                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1517                    prompts.insert(step.name.clone(), prompt);
1518                    if let Some(sp) = resolve_role_system_prompt(
1519                        step,
1520                        &workflow.roles,
1521                        &resource_collector,
1522                        &memory_collector,
1523                        &storage_manager,
1524                        &vars,
1525                        workflow_dir,
1526                        &workflow.workflow.name,
1527                    )? {
1528                        race_sps.insert(step.name.clone(), sp);
1529                    }
1530                    race_storage_dirs.insert(
1531                        step.name.clone(),
1532                        storage_manager.add_dirs_for_step(step.storage.as_deref()),
1533                    );
1534                    active_steps.push(step);
1535                }
1536
1537                if active_steps.is_empty() {
1538                    continue;
1539                }
1540
1541                match execute_race_group(
1542                    &active_steps,
1543                    &prompts,
1544                    &race_sps,
1545                    &workflow.workflow.name,
1546                    tier_index,
1547                    session_ref,
1548                    wf_provider,
1549                    wf_model,
1550                    &race_storage_dirs,
1551                ) {
1552                    Ok((winner_name, output)) => {
1553                        // Find the winning step to process saves/next
1554                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1555                            if !winner.saves.is_empty() {
1556                                let saved = extract_saves(&output, &winner.saves)?;
1557                                for (k, v) in &saved {
1558                                    eprintln!("    saved {k} = {v}");
1559                                }
1560                                vars.extend(saved);
1561                            }
1562                            if winner.next.is_some() {
1563                                pending_next = winner.next.clone();
1564                            }
1565                        }
1566                        step_outputs.insert(winner_name.clone(), output);
1567                        eprintln!(
1568                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1569                        );
1570                    }
1571                    Err(e) => return Err(e),
1572                }
1573            }
1574        }
1575
1576        iteration += 1;
1577        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1578            if iteration >= MAX_LOOP_ITERATIONS {
1579                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1580            }
1581            break;
1582        }
1583    }
1584
1585    eprintln!("workflow '{}' completed", workflow.workflow.name);
1586    if let Some(c) = coordinator {
1587        let _ = c.finish(SessionStatus::Success);
1588    }
1589    Ok(())
1590}
1591
1592/// Short label for the zag subcommand a step will invoke. Used in
1593/// `step_started` events so listeners can distinguish run/review/plan/etc.
1594fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1595    match cmd {
1596        None => "run",
1597        Some(StepCommand::Review) => "review",
1598        Some(StepCommand::Plan) => "plan",
1599        Some(StepCommand::Pipe) => "pipe",
1600        Some(StepCommand::Collect) => "collect",
1601        Some(StepCommand::Summary) => "summary",
1602    }
1603}
1604
1605/// Truncated single-line preview of a rendered prompt for the session log.
1606fn prompt_preview(prompt: &str) -> String {
1607    const MAX: usize = 200;
1608    let collapsed: String = prompt
1609        .chars()
1610        .map(|c| if c == '\n' { ' ' } else { c })
1611        .collect();
1612    if collapsed.chars().count() <= MAX {
1613        collapsed
1614    } else {
1615        let truncated: String = collapsed.chars().take(MAX).collect();
1616        format!("{truncated}…")
1617    }
1618}
1619
1620#[cfg(test)]
1621#[path = "run_tests.rs"]
1622mod tests;