Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::storage::{FilesystemBackend, StorageManager};
18use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
19use crate::workflow::{parser, validate};
20
21/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
22const MAX_LOOP_ITERATIONS: usize = 100;
23
24/// Execute a workflow file (`.zwf` or `.zwfz`).
25///
26/// Parses the workflow, validates it, resolves the step DAG, and executes
27/// each step by delegating to `zag`. The optional `user_prompt` is injected
28/// as additional context into every step's prompt.
29///
30/// Resource advertisement (the `<resources>` block prepended to each step's
31/// system prompt) is enabled by default; pass `disable_resources = true` to
32/// opt out, e.g. via `zig run --no-resources`.
33///
34/// Memory injection (the `<memory>` block) is similarly enabled by default;
35/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
36///
37/// Storage injection (the `<storage>` block) is similarly enabled by default;
38/// pass `disable_storage = true` to opt out via `zig run --no-storage`.
39pub fn run_workflow(
40    workflow_path: &str,
41    user_prompt: Option<&str>,
42    disable_resources: bool,
43    disable_memory: bool,
44    disable_storage: bool,
45) -> Result<(), ZigError> {
46    check_zag()?;
47
48    let path = resolve_workflow_path(workflow_path)?;
49    let (workflow, source) = parser::parse_workflow(&path)?;
50
51    if let Err(errors) = validate::validate(&workflow) {
52        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
53        return Err(ZigError::Validation(msgs.join("; ")));
54    }
55
56    execute(
57        &workflow,
58        &path,
59        user_prompt,
60        source.dir(),
61        disable_resources,
62        disable_memory,
63        disable_storage,
64    )
65}
66
67/// Verify that `zag` is installed and available on PATH.
68pub(crate) fn check_zag() -> Result<(), ZigError> {
69    let zag_available = Command::new("zag")
70        .arg("--version")
71        .output()
72        .is_ok_and(|o| o.status.success());
73
74    if !zag_available {
75        return Err(ZigError::Zag(
76            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
77        ));
78    }
79    Ok(())
80}
81
82/// Resolve a workflow argument to an actual file path.
83///
84/// Tries in order:
85/// 1. Literal path as given
86/// 2. With `.zwf` extension appended
87/// 3. With `.zwfz` extension appended
88/// 4. Under local project `.zig/workflows/` directory
89/// 5. Under local project `.zig/workflows/` with `.zwf` appended
90/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
91/// 7. Under global `~/.zig/workflows/` directory
92/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
93/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
94pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
95    let mut candidates = vec![
96        PathBuf::from(workflow),
97        PathBuf::from(format!("{workflow}.zwf")),
98        PathBuf::from(format!("{workflow}.zwfz")),
99    ];
100
101    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
102        candidates.push(local_dir.join(workflow));
103        candidates.push(local_dir.join(format!("{workflow}.zwf")));
104        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
105    }
106
107    if let Some(global_dir) = crate::paths::global_workflows_dir() {
108        candidates.push(global_dir.join(workflow));
109        candidates.push(global_dir.join(format!("{workflow}.zwf")));
110        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
111    }
112
113    for candidate in &candidates {
114        if candidate.exists() {
115            return Ok(candidate.clone());
116        }
117    }
118
119    Err(ZigError::Io(format!(
120        "workflow not found: '{workflow}' (tried: {})",
121        candidates
122            .iter()
123            .map(|p| p.display().to_string())
124            .collect::<Vec<_>>()
125            .join(", ")
126    )))
127}
128
129/// Compute a topological ordering of steps grouped into tiers.
130///
131/// Each tier contains steps whose dependencies are all in earlier tiers,
132/// meaning steps within a tier can (in principle) run in parallel.
133/// Uses Kahn's algorithm.
134fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
135    let step_index: HashMap<&str, usize> = steps
136        .iter()
137        .enumerate()
138        .map(|(i, s)| (s.name.as_str(), i))
139        .collect();
140
141    let mut in_degree = vec![0usize; steps.len()];
142    for (i, step) in steps.iter().enumerate() {
143        for dep in &step.depends_on {
144            if step_index.contains_key(dep.as_str()) {
145                in_degree[i] += 1;
146            }
147        }
148    }
149
150    let mut tiers = Vec::new();
151    let mut remaining = in_degree.clone();
152    let mut completed: Vec<bool> = vec![false; steps.len()];
153
154    loop {
155        let tier: Vec<usize> = (0..steps.len())
156            .filter(|&i| !completed[i] && remaining[i] == 0)
157            .collect();
158
159        if tier.is_empty() {
160            break;
161        }
162
163        for &i in &tier {
164            completed[i] = true;
165        }
166
167        // Decrement in-degrees for dependents of this tier
168        for &i in &tier {
169            for (j, step) in steps.iter().enumerate() {
170                if !completed[j] && step.depends_on.contains(&steps[i].name) {
171                    remaining[j] -= 1;
172                }
173            }
174        }
175
176        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
177    }
178
179    let completed_count: usize = completed.iter().filter(|&&c| c).count();
180    if completed_count != steps.len() {
181        return Err(ZigError::Execution(
182            "could not resolve all steps — possible undetected cycle".into(),
183        ));
184    }
185
186    Ok(tiers)
187}
188
189/// Replace `${var_name}` references in a template with values from the variable map.
190///
191/// Supports dotted paths like `${result.score}` — the root variable name is
192/// looked up, and if its value is valid JSON, the nested path is traversed.
193/// Unknown variables are left as-is.
194fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
195    let mut result = String::with_capacity(template.len());
196    let mut rest = template;
197
198    while let Some(start) = rest.find("${") {
199        result.push_str(&rest[..start]);
200        let after_start = &rest[start + 2..];
201
202        if let Some(end) = after_start.find('}') {
203            let var_expr = &after_start[..end];
204            let mut parts = var_expr.splitn(2, '.');
205            let root = parts.next().unwrap_or(var_expr);
206
207            if let Some(value) = vars.get(root) {
208                if let Some(path) = parts.next() {
209                    // Try to navigate a JSON path
210                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
211                        let resolved = json_path_lookup(&json, path);
212                        result.push_str(&resolved);
213                    } else {
214                        result.push_str(value);
215                    }
216                } else {
217                    result.push_str(value);
218                }
219            } else {
220                // Unknown variable — leave as-is
221                result.push_str(&rest[start..start + 2 + end + 1]);
222            }
223
224            rest = &after_start[end + 1..];
225        } else {
226            result.push_str(&rest[start..]);
227            rest = "";
228        }
229    }
230
231    result.push_str(rest);
232    result
233}
234
235/// Look up a dotted path in a JSON value (e.g., "nested.field").
236fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
237    let mut current = value;
238    for key in path.split('.') {
239        match current.get(key) {
240            Some(v) => current = v,
241            None => return format!("${{?.{path}}}"),
242        }
243    }
244    match current {
245        serde_json::Value::String(s) => s.clone(),
246        other => other.to_string(),
247    }
248}
249
250/// Resolve the effective system prompt for a step, with any advertised
251/// resources prepended as a `<resources>` block.
252///
253/// Resolution order:
254/// 1. If `step.system_prompt` is set, use it (with variable substitution).
255/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
256///    look it up in the roles table, and use the role's system prompt
257///    (loaded from file if `system_prompt_file` is set).
258/// 3. Otherwise the base prompt is empty.
259///
260/// Resources from all configured tiers (global shared, global per-workflow,
261/// project cwd, inline workflow, inline step) are then collected by the
262/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
263/// that is prepended to the base prompt. If both the resources set and the
264/// base prompt are empty, returns `None` — keeping the current behavior when
265/// nothing is configured.
266#[allow(clippy::too_many_arguments)]
267fn resolve_role_system_prompt(
268    step: &Step,
269    roles: &HashMap<String, Role>,
270    resources: &ResourceCollector,
271    memory: &MemoryCollector,
272    storage: &StorageManager,
273    vars: &HashMap<String, String>,
274    workflow_dir: &Path,
275    workflow_name: &str,
276) -> Result<Option<String>, ZigError> {
277    // Resolve the base system prompt (may be empty if neither is set).
278    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
279        Some(substitute_vars(sp, vars))
280    } else if let Some(ref role_ref) = step.role {
281        let resolved_name = substitute_vars(role_ref, vars);
282        let role = roles.get(&resolved_name).ok_or_else(|| {
283            ZigError::Execution(format!(
284                "step '{}' references role '{}' which does not exist",
285                step.name, resolved_name
286            ))
287        })?;
288
289        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
290            let full_path = workflow_dir.join(expand_path(file_path));
291            Some(std::fs::read_to_string(&full_path).map_err(|e| {
292                ZigError::Execution(format!(
293                    "failed to read system_prompt_file '{}' for role '{}': {e}",
294                    full_path.display(),
295                    resolved_name
296                ))
297            })?)
298        } else {
299            role.system_prompt.clone()
300        };
301
302        raw_prompt.map(|p| substitute_vars(&p, vars))
303    } else {
304        None
305    };
306
307    // Collect and render resources.
308    let set = resources.collect_for_step(&step.resources)?;
309    let resource_block = render_system_block(&set);
310
311    // Collect and render memory.
312    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
313    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
314
315    // Render storage block for this step's scope.
316    let storage_block = match storage.render_block(step.storage.as_deref())? {
317        Some(mut s) => {
318            s.push('\n');
319            s
320        }
321        None => String::new(),
322    };
323
324    let prefix = format!("{resource_block}{memory_block}{storage_block}");
325
326    match (prefix.is_empty(), base_prompt) {
327        (true, None) => Ok(None),
328        (true, Some(p)) => Ok(Some(p)),
329        (false, None) => Ok(Some(prefix.trim_end().to_string())),
330        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
331    }
332}
333
334/// Load file-backed default values for variables.
335///
336/// For each variable with `default_file` set (and no `default`), reads the
337/// file contents relative to `workflow_dir` and inserts them into the vars map.
338fn load_file_defaults(
339    vars: &mut HashMap<String, String>,
340    declarations: &HashMap<String, crate::workflow::model::Variable>,
341    workflow_dir: &Path,
342) -> Result<(), ZigError> {
343    for (name, decl) in declarations {
344        if decl.default.is_none() {
345            if let Some(ref file_path) = decl.default_file {
346                let full_path = workflow_dir.join(expand_path(file_path));
347                let content = std::fs::read_to_string(&full_path).map_err(|e| {
348                    ZigError::Execution(format!(
349                        "failed to read default_file '{}' for variable '{name}': {e}",
350                        full_path.display()
351                    ))
352                })?;
353                vars.insert(name.clone(), content);
354            }
355        }
356    }
357    Ok(())
358}
359
360/// Evaluate a simple condition expression against the current variable state.
361///
362/// Supports:
363/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
364/// - String equality: `status == "done"`, `status != "pending"`
365/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
366fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
367    let condition = condition.trim();
368
369    // Try comparison operators (ordered by length to match `<=` before `<`)
370    let operators = ["<=", ">=", "!=", "==", "<", ">"];
371    for op in &operators {
372        if let Some(pos) = condition.find(op) {
373            let lhs = resolve_operand(condition[..pos].trim(), vars);
374            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
375            return Ok(compare(&lhs, &rhs, op));
376        }
377    }
378
379    // Truthy check: single variable name
380    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
381    Ok(is_truthy(value))
382}
383
384/// Resolve a condition operand to its string value.
385/// - String literals ("done") → done
386/// - Variable names → looked up in vars
387/// - Numeric literals → left as-is
388fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
389    // Strip surrounding quotes for string literals
390    if (token.starts_with('"') && token.ends_with('"'))
391        || (token.starts_with('\'') && token.ends_with('\''))
392    {
393        return token[1..token.len() - 1].to_string();
394    }
395    // Try variable lookup
396    if let Some(val) = vars.get(token) {
397        return val.clone();
398    }
399    // Return as-is (numeric literal or unknown)
400    token.to_string()
401}
402
403/// Compare two string operands with the given operator.
404/// Attempts numeric comparison first, falls back to lexicographic.
405fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
406    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
407        return match op {
408            "==" => (l - r).abs() < f64::EPSILON,
409            "!=" => (l - r).abs() >= f64::EPSILON,
410            "<" => l < r,
411            ">" => l > r,
412            "<=" => l <= r,
413            ">=" => l >= r,
414            _ => false,
415        };
416    }
417    match op {
418        "==" => lhs == rhs,
419        "!=" => lhs != rhs,
420        "<" => lhs < rhs,
421        ">" => lhs > rhs,
422        "<=" => lhs <= rhs,
423        ">=" => lhs >= rhs,
424        _ => false,
425    }
426}
427
428/// Check if a string value is truthy.
429fn is_truthy(value: &str) -> bool {
430    !value.is_empty() && value != "false" && value != "0"
431}
432
433/// Build the final prompt for a step, incorporating variable substitution,
434/// dependency outputs, and the user's context prompt.
435fn render_step_prompt(
436    step: &Step,
437    vars: &HashMap<String, String>,
438    user_prompt: Option<&str>,
439    dependency_outputs: &HashMap<String, String>,
440) -> String {
441    let mut prompt = String::new();
442
443    // Prepend user context if provided
444    if let Some(ctx) = user_prompt {
445        prompt.push_str(&format!("User context: {ctx}\n\n"));
446    }
447
448    // Inject dependency outputs if requested
449    if step.inject_context {
450        for dep in &step.depends_on {
451            if let Some(output) = dependency_outputs.get(dep) {
452                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
453            }
454        }
455    }
456
457    // Append the step's prompt with variable substitution
458    prompt.push_str(&substitute_vars(&step.prompt, vars));
459
460    prompt
461}
462
463/// Build the argument list for a zag invocation.
464///
465/// Separated from `execute_step` to allow unit testing of flag logic
466/// without a real `zag` binary. The optional `model_override` is used
467/// during retries when `retry_model` escalates to a different model.
468///
469/// The subcommand is determined by `step.command`:
470/// - `None` → `zag run <prompt>` (default)
471/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
472/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
473/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
474/// - `Collect` → `zag collect <session-ids...>`
475/// - `Summary` → `zag summary <session-ids...>`
476#[allow(clippy::too_many_arguments)]
477fn build_zag_args(
478    step: &Step,
479    prompt: &str,
480    workflow_name: &str,
481    model_override: Option<&str>,
482    rendered_system_prompt: Option<&str>,
483    workflow_provider: Option<&str>,
484    workflow_model: Option<&str>,
485    extra_add_dirs: &[std::path::PathBuf],
486) -> Vec<String> {
487    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
488
489    // Build command-specific prefix and determine if agent args apply
490    let (mut args, accepts_agent_args) = match &step.command {
491        None => (vec!["run".to_string(), prompt.to_string()], true),
492        Some(StepCommand::Review) => {
493            let mut a = vec!["review".to_string()];
494            if !prompt.is_empty() {
495                a.push(prompt.to_string());
496            }
497            if step.uncommitted {
498                a.push("--uncommitted".into());
499            }
500            if let Some(base) = &step.base {
501                a.extend(["--base".into(), base.clone()]);
502            }
503            if let Some(commit) = &step.commit {
504                a.extend(["--commit".into(), commit.clone()]);
505            }
506            if let Some(title) = &step.title {
507                a.extend(["--title".into(), title.clone()]);
508            }
509            (a, true)
510        }
511        Some(StepCommand::Plan) => {
512            let mut a = vec!["plan".to_string(), prompt.to_string()];
513            if let Some(output) = &step.plan_output {
514                a.extend(["-o".into(), expand_path(output)]);
515            }
516            if let Some(instructions) = &step.instructions {
517                a.extend(["--instructions".into(), instructions.clone()]);
518            }
519            (a, true)
520        }
521        Some(StepCommand::Pipe) => {
522            let mut a = vec!["pipe".to_string()];
523            for dep in &step.depends_on {
524                a.push(session_name(dep));
525            }
526            a.push("--".into());
527            a.push(prompt.to_string());
528            (a, true)
529        }
530        Some(StepCommand::Collect) => {
531            let mut a = vec!["collect".to_string()];
532            for dep in &step.depends_on {
533                a.push(session_name(dep));
534            }
535            (a, false)
536        }
537        Some(StepCommand::Summary) => {
538            let mut a = vec!["summary".to_string()];
539            for dep in &step.depends_on {
540                a.push(session_name(dep));
541            }
542            (a, false)
543        }
544    };
545
546    // Agent args (provider, model, prompts, output, etc.) only apply to
547    // commands that launch an agent: run, review, plan, pipe.
548    if accepts_agent_args {
549        let effective_provider = step.provider.as_deref().or(workflow_provider);
550        if let Some(provider) = effective_provider {
551            args.extend(["--provider".into(), provider.to_string()]);
552        }
553
554        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
555        if let Some(model) = effective_model {
556            args.extend(["--model".into(), model.to_string()]);
557        }
558
559        if let Some(sp) = rendered_system_prompt {
560            args.extend(["--system-prompt".into(), sp.to_string()]);
561        }
562        if let Some(max_turns) = step.max_turns {
563            args.extend(["--max-turns".into(), max_turns.to_string()]);
564        }
565
566        // Output format: explicit format overrides the json bool
567        if let Some(output) = &step.output {
568            args.extend(["-o".into(), output.clone()]);
569        } else if step.json {
570            args.push("--json".into());
571        }
572        if let Some(schema) = &step.json_schema {
573            args.extend(["--json-schema".into(), schema.clone()]);
574        }
575
576        if let Some(mcp_config) = &step.mcp_config {
577            args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
578        }
579
580        // Execution environment
581        if step.auto_approve {
582            args.push("--auto-approve".into());
583        }
584        if let Some(root) = &step.root {
585            args.extend(["--root".into(), expand_path(root)]);
586        }
587        for dir in &step.add_dirs {
588            args.extend(["--add-dir".into(), expand_path(dir)]);
589        }
590        for dir in extra_add_dirs {
591            args.extend(["--add-dir".into(), dir.display().to_string()]);
592        }
593        for (key, value) in &step.env {
594            args.extend(["--env".into(), format!("{key}={value}")]);
595        }
596        for file in &step.files {
597            args.extend(["--file".into(), expand_path(file)]);
598        }
599
600        // Context injection
601        for ctx in &step.context {
602            args.extend(["--context".into(), ctx.clone()]);
603        }
604        if let Some(plan) = &step.plan {
605            args.extend(["--plan".into(), expand_path(plan)]);
606        }
607
608        // Isolation
609        if step.worktree {
610            args.push("--worktree".into());
611        }
612        if let Some(sandbox) = &step.sandbox {
613            args.extend(["--sandbox".into(), sandbox.clone()]);
614        }
615    }
616
617    // Session metadata applies to all commands
618    let name = session_name(&step.name);
619    args.extend(["--name".into(), name]);
620
621    if !step.description.is_empty() {
622        args.extend(["--description".into(), step.description.clone()]);
623    }
624
625    args.extend(["--tag".into(), "zig-workflow".into()]);
626    for tag in &step.tags {
627        args.extend(["--tag".into(), tag.clone()]);
628    }
629
630    if let Some(timeout) = &step.timeout {
631        args.extend(["--timeout".into(), timeout.clone()]);
632    }
633
634    args
635}
636
637/// Spawn `zag` and stream its stdout/stderr live to our stderr while
638/// also accumulating stdout into a buffer for `saves` extraction.
639///
640/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
641/// — used to disambiguate output from steps running in parallel.
642fn run_zag_streaming(
643    args: &[String],
644    step_name: &str,
645    prefix: Option<&str>,
646    session: Option<&Arc<SessionWriter>>,
647) -> Result<(std::process::ExitStatus, String), ZigError> {
648    let mut cmd = Command::new("zag");
649    cmd.args(args)
650        .stdout(std::process::Stdio::piped())
651        .stderr(std::process::Stdio::piped());
652
653    let mut child = cmd
654        .spawn()
655        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
656
657    let stdout = child.stdout.take().expect("stdout was piped");
658    let stderr = child.stderr.take().expect("stderr was piped");
659
660    let buffer = Arc::new(Mutex::new(String::new()));
661    let buffer_clone = Arc::clone(&buffer);
662    let prefix_stdout = prefix.map(String::from);
663    let prefix_stderr = prefix.map(String::from);
664    let session_stdout = session.cloned();
665    let session_stderr = session.cloned();
666    let step_name_stdout = step_name.to_string();
667    let step_name_stderr = step_name.to_string();
668
669    let stdout_thread = thread::spawn(move || {
670        let reader = BufReader::new(stdout);
671        let stderr_handle = std::io::stderr();
672        for line in reader.lines().map_while(Result::ok) {
673            if let Ok(mut buf) = buffer_clone.lock() {
674                buf.push_str(&line);
675                buf.push('\n');
676            }
677            if let Some(w) = &session_stdout {
678                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
679            }
680            let mut h = stderr_handle.lock();
681            let _ = match &prefix_stdout {
682                Some(p) => writeln!(h, "[{p}] {line}"),
683                None => writeln!(h, "{line}"),
684            };
685        }
686    });
687
688    let stderr_thread = thread::spawn(move || {
689        let reader = BufReader::new(stderr);
690        let stderr_handle = std::io::stderr();
691        for line in reader.lines().map_while(Result::ok) {
692            if let Some(w) = &session_stderr {
693                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
694            }
695            let mut h = stderr_handle.lock();
696            let _ = match &prefix_stderr {
697                Some(p) => writeln!(h, "[{p}] {line}"),
698                None => writeln!(h, "{line}"),
699            };
700        }
701    });
702
703    let status = child
704        .wait()
705        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
706
707    let _ = stdout_thread.join();
708    let _ = stderr_thread.join();
709
710    let captured = Arc::try_unwrap(buffer)
711        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
712        .into_inner()
713        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
714
715    Ok((status, captured))
716}
717
718/// Execute a single step by invoking `zag`, streaming its output live.
719///
720/// Returns the captured stdout from zag. The optional `model_override`
721/// is used during retries to escalate to a different model. The optional
722/// `prefix` tags streamed lines with the step name (used for parallel runs).
723#[allow(clippy::too_many_arguments)]
724fn execute_step(
725    step: &Step,
726    prompt: &str,
727    workflow_name: &str,
728    model_override: Option<&str>,
729    prefix: Option<&str>,
730    session: Option<&Arc<SessionWriter>>,
731    rendered_system_prompt: Option<&str>,
732    workflow_provider: Option<&str>,
733    workflow_model: Option<&str>,
734    extra_add_dirs: &[std::path::PathBuf],
735) -> Result<String, ZigError> {
736    let args = build_zag_args(
737        step,
738        prompt,
739        workflow_name,
740        model_override,
741        rendered_system_prompt,
742        workflow_provider,
743        workflow_model,
744        extra_add_dirs,
745    );
746    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
747
748    if !status.success() {
749        return Err(ZigError::Execution(format!(
750            "step '{}' failed (exit {})",
751            step.name, status,
752        )));
753    }
754
755    Ok(stdout)
756}
757
758/// Run a step with retry logic, returning its captured stdout on success.
759///
760/// Extracted so both sequential and parallel execution paths share the
761/// same retry / model-escalation behavior.
762#[allow(clippy::too_many_arguments)]
763fn run_step_attempts(
764    step: &Step,
765    prompt: &str,
766    workflow_name: &str,
767    prefix: Option<&str>,
768    session: Option<&Arc<SessionWriter>>,
769    rendered_system_prompt: Option<&str>,
770    workflow_provider: Option<&str>,
771    workflow_model: Option<&str>,
772    extra_add_dirs: &[std::path::PathBuf],
773) -> Result<String, ZigError> {
774    let mut attempts = 0;
775    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
776        step.max_retries.unwrap_or(1) + 1
777    } else {
778        1
779    };
780
781    loop {
782        attempts += 1;
783        let model_override = if attempts > 1 {
784            step.retry_model.as_deref()
785        } else {
786            None
787        };
788        match execute_step(
789            step,
790            prompt,
791            workflow_name,
792            model_override,
793            prefix,
794            session,
795            rendered_system_prompt,
796            workflow_provider,
797            workflow_model,
798            extra_add_dirs,
799        ) {
800            Ok(output) => return Ok(output),
801            Err(e) => {
802                if let Some(w) = session {
803                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
804                }
805                if attempts < max_attempts {
806                    eprintln!(
807                        "    retry {}/{} for step '{}'",
808                        attempts,
809                        max_attempts - 1,
810                        step.name
811                    );
812                    continue;
813                }
814                return Err(e);
815            }
816        }
817    }
818}
819
820/// Extract variable values from step output using `saves` selectors.
821///
822/// Selectors:
823/// - `"$"` — the entire output
824/// - `"$.field"` — a top-level JSON field
825/// - `"$.nested.field"` — a nested JSON field
826fn extract_saves(
827    output: &str,
828    saves: &HashMap<String, String>,
829) -> Result<HashMap<String, String>, ZigError> {
830    let mut extracted = HashMap::new();
831
832    for (var_name, selector) in saves {
833        let value = if selector == "$" {
834            output.trim().to_string()
835        } else if let Some(path) = selector.strip_prefix("$.") {
836            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
837                ZigError::Execution(format!(
838                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
839                ))
840            })?;
841            json_path_lookup(&json, path)
842        } else {
843            output.trim().to_string()
844        };
845
846        extracted.insert(var_name.clone(), value);
847    }
848
849    Ok(extracted)
850}
851
852/// Partition a tier of steps into sequential steps and race groups.
853///
854/// Steps without a `race_group` are returned as sequential. Steps sharing
855/// the same `race_group` value are grouped together for parallel execution.
856fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
857    let mut sequential = Vec::new();
858    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
859
860    for step in tier {
861        if let Some(group) = &step.race_group {
862            race_groups.entry(group.clone()).or_default().push(step);
863        } else {
864            sequential.push(*step);
865        }
866    }
867
868    (sequential, race_groups)
869}
870
871/// Spawn a step as a child process without waiting for it to finish.
872fn spawn_step(
873    step: &Step,
874    prompt: &str,
875    workflow_name: &str,
876    rendered_system_prompt: Option<&str>,
877    workflow_provider: Option<&str>,
878    workflow_model: Option<&str>,
879    extra_add_dirs: &[std::path::PathBuf],
880) -> Result<std::process::Child, ZigError> {
881    let args = build_zag_args(
882        step,
883        prompt,
884        workflow_name,
885        None,
886        rendered_system_prompt,
887        workflow_provider,
888        workflow_model,
889        extra_add_dirs,
890    );
891    let mut cmd = Command::new("zag");
892    cmd.args(&args)
893        .stdout(std::process::Stdio::piped())
894        .stderr(std::process::Stdio::piped());
895
896    cmd.spawn()
897        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
898}
899
900/// Execute a race group: run all steps in parallel, return the first winner.
901///
902/// When one step finishes successfully, all remaining steps are killed.
903/// Returns the winning step's name and its stdout output.
904#[allow(clippy::too_many_arguments)]
905fn execute_race_group(
906    steps: &[&Step],
907    prompts: &HashMap<String, String>,
908    system_prompts: &HashMap<String, String>,
909    workflow_name: &str,
910    tier_index: usize,
911    session: Option<&Arc<SessionWriter>>,
912    workflow_provider: Option<&str>,
913    workflow_model: Option<&str>,
914    storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
915) -> Result<(String, String), ZigError> {
916    if let Some(w) = session {
917        for step in steps {
918            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
919            let preview = prompts
920                .get(&step.name)
921                .map(|p| prompt_preview(p))
922                .unwrap_or_default();
923            let _ = w.step_started(
924                &step.name,
925                tier_index,
926                &zag_session_id,
927                zag_command_name(&step.command),
928                step.model.as_deref(),
929                &preview,
930            );
931        }
932    }
933    let race_started = Instant::now();
934    let mut children: Vec<(String, std::process::Child)> = Vec::new();
935
936    for step in steps {
937        let prompt = prompts.get(&step.name).ok_or_else(|| {
938            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
939        })?;
940        eprintln!("  racing step '{}'...", step.name);
941        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
942        let empty: Vec<std::path::PathBuf> = Vec::new();
943        let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty);
944        let child = spawn_step(
945            step,
946            prompt,
947            workflow_name,
948            rendered_sp,
949            workflow_provider,
950            workflow_model,
951            extra_dirs,
952        )?;
953        children.push((step.name.clone(), child));
954    }
955
956    // Poll until one finishes
957    loop {
958        for i in 0..children.len() {
959            let status = children[i]
960                .1
961                .try_wait()
962                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
963
964            if let Some(exit_status) = status {
965                let (winner_name, winner_child) = children.remove(i);
966
967                // Kill remaining children
968                for (name, mut child) in children {
969                    eprintln!("  cancelling step '{name}' (race lost)");
970                    let _ = child.kill();
971                    let _ = child.wait();
972                }
973
974                let elapsed = race_started.elapsed().as_millis() as u64;
975                if !exit_status.success() {
976                    let stderr = winner_child
977                        .stderr
978                        .map(|mut s| {
979                            let mut buf = String::new();
980                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
981                            buf
982                        })
983                        .unwrap_or_default();
984                    let err_msg = format!(
985                        "race winner '{}' failed (exit {}): {}",
986                        winner_name,
987                        exit_status,
988                        stderr.trim()
989                    );
990                    if let Some(w) = session {
991                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
992                    }
993                    return Err(ZigError::Execution(err_msg));
994                }
995
996                let stdout = winner_child
997                    .stdout
998                    .map(|mut s| {
999                        let mut buf = String::new();
1000                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
1001                        buf
1002                    })
1003                    .unwrap_or_default();
1004
1005                eprintln!("  race won by '{winner_name}'");
1006                if let Some(w) = session {
1007                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1008                }
1009                return Ok((winner_name, stdout));
1010            }
1011        }
1012
1013        std::thread::sleep(Duration::from_millis(100));
1014    }
1015}
1016
1017/// Execute a single sequential step with retry logic, saves, and next-jump handling.
1018#[allow(clippy::too_many_arguments)]
1019fn execute_sequential_step(
1020    step: &Step,
1021    vars: &mut HashMap<String, String>,
1022    user_prompt: Option<&str>,
1023    step_outputs: &mut HashMap<String, String>,
1024    workflow_name: &str,
1025    pending_next: &mut Option<String>,
1026    tier_index: usize,
1027    session: Option<&Arc<SessionWriter>>,
1028    roles: &HashMap<String, Role>,
1029    resources: &ResourceCollector,
1030    memory: &MemoryCollector,
1031    storage: &StorageManager,
1032    workflow_dir: &Path,
1033    workflow_provider: Option<&str>,
1034    workflow_model: Option<&str>,
1035) -> Result<(), ZigError> {
1036    if let Some(condition) = &step.condition {
1037        if !evaluate_condition(condition, vars)? {
1038            eprintln!(
1039                "  skipping '{}' (condition not met: {condition})",
1040                step.name
1041            );
1042            if let Some(w) = session {
1043                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1044            }
1045            return Ok(());
1046        }
1047    }
1048
1049    eprintln!("  running step '{}'...", step.name);
1050
1051    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1052    let rendered_sp = resolve_role_system_prompt(
1053        step,
1054        roles,
1055        resources,
1056        memory,
1057        storage,
1058        vars,
1059        workflow_dir,
1060        workflow_name,
1061    )?;
1062    let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1063    if let Some(w) = session {
1064        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1065        let _ = w.step_started(
1066            &step.name,
1067            tier_index,
1068            &zag_session_id,
1069            zag_command_name(&step.command),
1070            step.model.as_deref(),
1071            &prompt_preview(&prompt),
1072        );
1073    }
1074    let started = Instant::now();
1075    let result = run_step_attempts(
1076        step,
1077        &prompt,
1078        workflow_name,
1079        None,
1080        session,
1081        rendered_sp.as_deref(),
1082        workflow_provider,
1083        workflow_model,
1084        &storage_dirs,
1085    );
1086
1087    match result {
1088        Ok(output) => {
1089            let mut saved_keys: Vec<String> = Vec::new();
1090            if !step.saves.is_empty() {
1091                let saved = extract_saves(&output, &step.saves)?;
1092                for (k, v) in &saved {
1093                    eprintln!("    saved {k} = {v}");
1094                    saved_keys.push(k.clone());
1095                }
1096                vars.extend(saved);
1097            }
1098
1099            step_outputs.insert(step.name.clone(), output);
1100            eprintln!("  completed '{}'", step.name);
1101            if let Some(w) = session {
1102                let _ = w.step_completed(
1103                    &step.name,
1104                    0,
1105                    started.elapsed().as_millis() as u64,
1106                    saved_keys,
1107                );
1108            }
1109
1110            if step.next.is_some() {
1111                *pending_next = step.next.clone();
1112            }
1113        }
1114        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1115            FailurePolicy::Fail => return Err(e),
1116            FailurePolicy::Continue => {
1117                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1118            }
1119            FailurePolicy::Retry => {
1120                return Err(e);
1121            }
1122        },
1123    }
1124
1125    Ok(())
1126}
1127
1128/// Run multiple independent steps in a tier concurrently.
1129///
1130/// All non-skipped steps are spawned in their own threads and we wait for
1131/// every one to finish (unlike race groups, which kill losers). Output
1132/// lines from each step are streamed live to stderr, prefixed with the
1133/// step name to disambiguate. After completion, results are processed in
1134/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1135/// remain deterministic.
1136#[allow(clippy::too_many_arguments)]
1137fn execute_parallel_tier(
1138    steps: &[&Step],
1139    vars: &mut HashMap<String, String>,
1140    user_prompt: Option<&str>,
1141    step_outputs: &mut HashMap<String, String>,
1142    workflow_name: &str,
1143    pending_next: &mut Option<String>,
1144    tier_index: usize,
1145    session: Option<&Arc<SessionWriter>>,
1146    roles: &HashMap<String, Role>,
1147    resources: &ResourceCollector,
1148    memory: &MemoryCollector,
1149    storage: &StorageManager,
1150    workflow_dir: &Path,
1151    workflow_provider: Option<&str>,
1152    workflow_model: Option<&str>,
1153) -> Result<(), ZigError> {
1154    // Evaluate conditions and render prompts up front, so threads receive
1155    // the same variable snapshot they would have under sequential execution.
1156    let mut active: Vec<&Step> = Vec::new();
1157    let mut prompts: HashMap<String, String> = HashMap::new();
1158    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1159    let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1160    for step in steps {
1161        if let Some(condition) = &step.condition {
1162            if !evaluate_condition(condition, vars)? {
1163                eprintln!(
1164                    "  skipping '{}' (condition not met: {condition})",
1165                    step.name
1166                );
1167                if let Some(w) = session {
1168                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1169                }
1170                continue;
1171            }
1172        }
1173        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1174        prompts.insert(step.name.clone(), prompt);
1175        if let Some(sp) = resolve_role_system_prompt(
1176            step,
1177            roles,
1178            resources,
1179            memory,
1180            storage,
1181            vars,
1182            workflow_dir,
1183            workflow_name,
1184        )? {
1185            rendered_sps.insert(step.name.clone(), sp);
1186        }
1187        storage_dirs_map.insert(
1188            step.name.clone(),
1189            storage.add_dirs_for_step(step.storage.as_deref()),
1190        );
1191        active.push(*step);
1192    }
1193
1194    if active.is_empty() {
1195        return Ok(());
1196    }
1197
1198    eprintln!("  running {} steps in parallel...", active.len());
1199
1200    let mut start_times: HashMap<String, Instant> = HashMap::new();
1201    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1202    for step in &active {
1203        let step_clone: Step = (*step).clone();
1204        let prompt = prompts.remove(&step.name).unwrap_or_default();
1205        let rendered_sp = rendered_sps.remove(&step.name);
1206        let workflow_name_owned = workflow_name.to_string();
1207        let name = step.name.clone();
1208        eprintln!("  starting '{name}'...");
1209        if let Some(w) = session {
1210            let zag_session_id = format!("zig-{workflow_name}-{name}");
1211            let _ = w.step_started(
1212                &name,
1213                tier_index,
1214                &zag_session_id,
1215                zag_command_name(&step.command),
1216                step.model.as_deref(),
1217                &prompt_preview(&prompt),
1218            );
1219        }
1220        start_times.insert(name.clone(), Instant::now());
1221        let session_clone = session.cloned();
1222        let wf_provider = workflow_provider.map(String::from);
1223        let wf_model = workflow_model.map(String::from);
1224        let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1225        let handle = thread::spawn(move || {
1226            let res = run_step_attempts(
1227                &step_clone,
1228                &prompt,
1229                &workflow_name_owned,
1230                Some(&name),
1231                session_clone.as_ref(),
1232                rendered_sp.as_deref(),
1233                wf_provider.as_deref(),
1234                wf_model.as_deref(),
1235                &storage_dirs,
1236            );
1237            (name, res)
1238        });
1239        handles.push(handle);
1240    }
1241
1242    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1243    for handle in handles {
1244        match handle.join() {
1245            Ok((name, res)) => {
1246                results.insert(name, res);
1247            }
1248            Err(_) => {
1249                return Err(ZigError::Execution("parallel step thread panicked".into()));
1250            }
1251        }
1252    }
1253
1254    // Process results in tier-declaration order so `next` is deterministic.
1255    let mut errors: Vec<String> = Vec::new();
1256    for step in &active {
1257        let Some(res) = results.remove(&step.name) else {
1258            continue;
1259        };
1260        let elapsed = start_times
1261            .remove(&step.name)
1262            .map(|t| t.elapsed().as_millis() as u64)
1263            .unwrap_or(0);
1264        match res {
1265            Ok(output) => {
1266                let mut saved_keys: Vec<String> = Vec::new();
1267                if !step.saves.is_empty() {
1268                    let saved = extract_saves(&output, &step.saves)?;
1269                    for (k, v) in &saved {
1270                        eprintln!("    saved {k} = {v}");
1271                        saved_keys.push(k.clone());
1272                    }
1273                    vars.extend(saved);
1274                }
1275                step_outputs.insert(step.name.clone(), output);
1276                eprintln!("  completed '{}'", step.name);
1277                if let Some(w) = session {
1278                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1279                }
1280                if step.next.is_some() && pending_next.is_none() {
1281                    *pending_next = step.next.clone();
1282                }
1283            }
1284            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1285                FailurePolicy::Continue => {
1286                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1287                }
1288                FailurePolicy::Fail | FailurePolicy::Retry => {
1289                    errors.push(format!("'{}': {e}", step.name));
1290                }
1291            },
1292        }
1293    }
1294
1295    if !errors.is_empty() {
1296        return Err(ZigError::Execution(format!(
1297            "parallel step(s) failed: {}",
1298            errors.join("; ")
1299        )));
1300    }
1301
1302    Ok(())
1303}
1304
1305/// Initialize the variable map from workflow variable definitions.
1306/// Variables with defaults are set to their default value; others are empty.
1307fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1308    let mut vars = HashMap::new();
1309    for (name, var) in &workflow.vars {
1310        let value = match &var.default {
1311            Some(toml::Value::String(s)) => s.clone(),
1312            Some(toml::Value::Integer(n)) => n.to_string(),
1313            Some(toml::Value::Float(f)) => f.to_string(),
1314            Some(toml::Value::Boolean(b)) => b.to_string(),
1315            Some(other) => other.to_string(),
1316            None => String::new(),
1317        };
1318        vars.insert(name.clone(), value);
1319    }
1320    vars
1321}
1322
1323/// Main execution loop for a validated workflow.
1324fn execute(
1325    workflow: &Workflow,
1326    workflow_path: &std::path::Path,
1327    user_prompt: Option<&str>,
1328    workflow_dir: &Path,
1329    disable_resources: bool,
1330    disable_memory: bool,
1331    disable_storage: bool,
1332) -> Result<(), ZigError> {
1333    let mut vars = init_vars(workflow);
1334
1335    let resource_collector = ResourceCollector::from_env(
1336        &workflow.workflow.name,
1337        &workflow.workflow.resources,
1338        workflow_dir,
1339        disable_resources,
1340    );
1341
1342    let config = ZigConfig::load();
1343    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1344    let memory_collector = MemoryCollector::from_env(
1345        &workflow.workflow.name,
1346        workflow_memory_mode,
1347        &config,
1348        disable_memory,
1349    );
1350
1351    // Build storage manager for this run. Paths resolve against <cwd>/.zig/;
1352    // absolute paths pass through. `ensure` is called on every declared item
1353    // so step agents can trust the paths exist before they run.
1354    // When `--no-storage` is passed, skip building entirely so storage dirs
1355    // are not created and the `<storage>` block is omitted from prompts.
1356    let storage_manager = if disable_storage || workflow.storage.is_empty() {
1357        StorageManager::empty()
1358    } else {
1359        let backend = FilesystemBackend::from_cwd()?;
1360        StorageManager::build(&workflow.storage, backend)?
1361    };
1362
1363    // Load file-backed variable defaults before prompt binding.
1364    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1365
1366    // Bind user prompt to the variable with `from = "prompt"`, if any.
1367    let prompt_var = workflow
1368        .vars
1369        .iter()
1370        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1371        .map(|(name, _)| name.clone());
1372
1373    if let Some(ref var_name) = prompt_var {
1374        if let Some(prompt) = user_prompt {
1375            vars.insert(var_name.clone(), prompt.to_string());
1376        }
1377    }
1378
1379    // Validate variable values against constraints before executing.
1380    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1381        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1382        return Err(ZigError::Validation(msgs.join("; ")));
1383    }
1384
1385    // When prompt is bound to a variable, don't also prepend "User context:".
1386    let effective_user_prompt = if prompt_var.is_some() {
1387        None
1388    } else {
1389        user_prompt
1390    };
1391
1392    let mut step_outputs: HashMap<String, String> = HashMap::new();
1393
1394    let wf_provider = workflow.workflow.provider.as_deref();
1395    let wf_model = workflow.workflow.model.as_deref();
1396
1397    let tiers = topological_sort(&workflow.steps)?;
1398
1399    eprintln!(
1400        "running workflow '{}' ({} steps in {} tiers)",
1401        workflow.workflow.name,
1402        workflow.steps.len(),
1403        tiers.len()
1404    );
1405
1406    // Open a zig session log for this run. Failure to open the log is
1407    // non-fatal — `zig run` should still execute even if `~/.zig` is
1408    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1409    // downstream so the writer-less path stays intact for tests.
1410    let coordinator = match SessionWriter::create(
1411        &workflow.workflow.name,
1412        &workflow_path.to_string_lossy(),
1413        user_prompt,
1414        tiers.len(),
1415    ) {
1416        Ok(writer) => {
1417            eprintln!("zig session: {}", writer.session_id());
1418            Some(SessionCoordinator::start(writer))
1419        }
1420        Err(e) => {
1421            eprintln!("warning: failed to open zig session log: {e}");
1422            None
1423        }
1424    };
1425    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1426    let session_ref = session_writer.as_ref();
1427
1428    let mut iteration = 0;
1429    let mut pending_next: Option<String> = None;
1430
1431    loop {
1432        let tiers_to_run = if let Some(ref next_step) = pending_next {
1433            // Re-run from the target step onward
1434            let remaining: Vec<Vec<&Step>> = tiers
1435                .iter()
1436                .map(|tier| {
1437                    tier.iter()
1438                        .filter(|s| s.name == *next_step)
1439                        .copied()
1440                        .collect::<Vec<_>>()
1441                })
1442                .filter(|tier| !tier.is_empty())
1443                .collect();
1444            pending_next = None;
1445            remaining
1446        } else if iteration == 0 {
1447            tiers.clone()
1448        } else {
1449            break;
1450        };
1451
1452        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1453            let (non_race, race_groups) = partition_tier(tier);
1454
1455            if let Some(w) = session_ref {
1456                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1457                let _ = w.tier_started(tier_index, names);
1458            }
1459
1460            // Independent steps in the same tier run concurrently. A single
1461            // step takes the sequential path so its output streams without
1462            // a name prefix; multiple steps go through the parallel path.
1463            if non_race.len() <= 1 {
1464                for step in &non_race {
1465                    execute_sequential_step(
1466                        step,
1467                        &mut vars,
1468                        effective_user_prompt,
1469                        &mut step_outputs,
1470                        &workflow.workflow.name,
1471                        &mut pending_next,
1472                        tier_index,
1473                        session_ref,
1474                        &workflow.roles,
1475                        &resource_collector,
1476                        &memory_collector,
1477                        &storage_manager,
1478                        workflow_dir,
1479                        wf_provider,
1480                        wf_model,
1481                    )?;
1482                }
1483            } else {
1484                execute_parallel_tier(
1485                    &non_race,
1486                    &mut vars,
1487                    effective_user_prompt,
1488                    &mut step_outputs,
1489                    &workflow.workflow.name,
1490                    &mut pending_next,
1491                    tier_index,
1492                    session_ref,
1493                    &workflow.roles,
1494                    &resource_collector,
1495                    &memory_collector,
1496                    &storage_manager,
1497                    workflow_dir,
1498                    wf_provider,
1499                    wf_model,
1500                )?;
1501            }
1502
1503            // Run race groups in parallel
1504            for (group_name, race_steps) in &race_groups {
1505                eprintln!("  starting race group '{group_name}'...");
1506
1507                // Build prompts for all racers (conditions evaluated here)
1508                let mut prompts = HashMap::new();
1509                let mut race_sps: HashMap<String, String> = HashMap::new();
1510                let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1511                    HashMap::new();
1512                let mut active_steps: Vec<&Step> = Vec::new();
1513                for step in race_steps {
1514                    if let Some(condition) = &step.condition {
1515                        if !evaluate_condition(condition, &vars)? {
1516                            eprintln!(
1517                                "  skipping '{}' (condition not met: {condition})",
1518                                step.name
1519                            );
1520                            continue;
1521                        }
1522                    }
1523                    let prompt =
1524                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1525                    prompts.insert(step.name.clone(), prompt);
1526                    if let Some(sp) = resolve_role_system_prompt(
1527                        step,
1528                        &workflow.roles,
1529                        &resource_collector,
1530                        &memory_collector,
1531                        &storage_manager,
1532                        &vars,
1533                        workflow_dir,
1534                        &workflow.workflow.name,
1535                    )? {
1536                        race_sps.insert(step.name.clone(), sp);
1537                    }
1538                    race_storage_dirs.insert(
1539                        step.name.clone(),
1540                        storage_manager.add_dirs_for_step(step.storage.as_deref()),
1541                    );
1542                    active_steps.push(step);
1543                }
1544
1545                if active_steps.is_empty() {
1546                    continue;
1547                }
1548
1549                match execute_race_group(
1550                    &active_steps,
1551                    &prompts,
1552                    &race_sps,
1553                    &workflow.workflow.name,
1554                    tier_index,
1555                    session_ref,
1556                    wf_provider,
1557                    wf_model,
1558                    &race_storage_dirs,
1559                ) {
1560                    Ok((winner_name, output)) => {
1561                        // Find the winning step to process saves/next
1562                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1563                            if !winner.saves.is_empty() {
1564                                let saved = extract_saves(&output, &winner.saves)?;
1565                                for (k, v) in &saved {
1566                                    eprintln!("    saved {k} = {v}");
1567                                }
1568                                vars.extend(saved);
1569                            }
1570                            if winner.next.is_some() {
1571                                pending_next = winner.next.clone();
1572                            }
1573                        }
1574                        step_outputs.insert(winner_name.clone(), output);
1575                        eprintln!(
1576                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1577                        );
1578                    }
1579                    Err(e) => return Err(e),
1580                }
1581            }
1582        }
1583
1584        iteration += 1;
1585        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1586            if iteration >= MAX_LOOP_ITERATIONS {
1587                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1588            }
1589            break;
1590        }
1591    }
1592
1593    eprintln!("workflow '{}' completed", workflow.workflow.name);
1594    if let Some(c) = coordinator {
1595        let _ = c.finish(SessionStatus::Success);
1596    }
1597    Ok(())
1598}
1599
1600/// Short label for the zag subcommand a step will invoke. Used in
1601/// `step_started` events so listeners can distinguish run/review/plan/etc.
1602fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1603    match cmd {
1604        None => "run",
1605        Some(StepCommand::Review) => "review",
1606        Some(StepCommand::Plan) => "plan",
1607        Some(StepCommand::Pipe) => "pipe",
1608        Some(StepCommand::Collect) => "collect",
1609        Some(StepCommand::Summary) => "summary",
1610    }
1611}
1612
1613/// Truncated single-line preview of a rendered prompt for the session log.
1614fn prompt_preview(prompt: &str) -> String {
1615    const MAX: usize = 200;
1616    let collapsed: String = prompt
1617        .chars()
1618        .map(|c| if c == '\n' { ' ' } else { c })
1619        .collect();
1620    if collapsed.chars().count() <= MAX {
1621        collapsed
1622    } else {
1623        let truncated: String = collapsed.chars().take(MAX).collect();
1624        format!("{truncated}…")
1625    }
1626}
1627
1628#[cfg(test)]
1629#[path = "run_tests.rs"]
1630mod tests;