Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::paths::expand_path;
15use crate::resources::{ResourceCollector, render_system_block};
16use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
17use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
18use crate::workflow::{parser, validate};
19
20/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
21const MAX_LOOP_ITERATIONS: usize = 100;
22
23/// Execute a workflow file (`.zwf` or `.zwfz`).
24///
25/// Parses the workflow, validates it, resolves the step DAG, and executes
26/// each step by delegating to `zag`. The optional `user_prompt` is injected
27/// as additional context into every step's prompt.
28///
29/// Resource advertisement (the `<resources>` block prepended to each step's
30/// system prompt) is enabled by default; pass `disable_resources = true` to
31/// opt out, e.g. via `zig run --no-resources`.
32///
33/// Memory injection (the `<memory>` block) is similarly enabled by default;
34/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
35pub fn run_workflow(
36    workflow_path: &str,
37    user_prompt: Option<&str>,
38    disable_resources: bool,
39    disable_memory: bool,
40) -> Result<(), ZigError> {
41    check_zag()?;
42
43    let path = resolve_workflow_path(workflow_path)?;
44    let (workflow, source) = parser::parse_workflow(&path)?;
45
46    if let Err(errors) = validate::validate(&workflow) {
47        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
48        return Err(ZigError::Validation(msgs.join("; ")));
49    }
50
51    execute(
52        &workflow,
53        &path,
54        user_prompt,
55        source.dir(),
56        disable_resources,
57        disable_memory,
58    )
59}
60
61/// Verify that `zag` is installed and available on PATH.
62pub(crate) fn check_zag() -> Result<(), ZigError> {
63    let zag_available = Command::new("zag")
64        .arg("--version")
65        .output()
66        .is_ok_and(|o| o.status.success());
67
68    if !zag_available {
69        return Err(ZigError::Zag(
70            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
71        ));
72    }
73    Ok(())
74}
75
76/// Resolve a workflow argument to an actual file path.
77///
78/// Tries in order:
79/// 1. Literal path as given
80/// 2. With `.zwf` extension appended
81/// 3. With `.zwfz` extension appended
82/// 4. Under local project `.zig/workflows/` directory
83/// 5. Under local project `.zig/workflows/` with `.zwf` appended
84/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
85/// 7. Under global `~/.zig/workflows/` directory
86/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
87/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
88pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
89    let mut candidates = vec![
90        PathBuf::from(workflow),
91        PathBuf::from(format!("{workflow}.zwf")),
92        PathBuf::from(format!("{workflow}.zwfz")),
93    ];
94
95    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
96        candidates.push(local_dir.join(workflow));
97        candidates.push(local_dir.join(format!("{workflow}.zwf")));
98        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
99    }
100
101    if let Some(global_dir) = crate::paths::global_workflows_dir() {
102        candidates.push(global_dir.join(workflow));
103        candidates.push(global_dir.join(format!("{workflow}.zwf")));
104        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
105    }
106
107    for candidate in &candidates {
108        if candidate.exists() {
109            return Ok(candidate.clone());
110        }
111    }
112
113    Err(ZigError::Io(format!(
114        "workflow not found: '{workflow}' (tried: {})",
115        candidates
116            .iter()
117            .map(|p| p.display().to_string())
118            .collect::<Vec<_>>()
119            .join(", ")
120    )))
121}
122
123/// Compute a topological ordering of steps grouped into tiers.
124///
125/// Each tier contains steps whose dependencies are all in earlier tiers,
126/// meaning steps within a tier can (in principle) run in parallel.
127/// Uses Kahn's algorithm.
128fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
129    let step_index: HashMap<&str, usize> = steps
130        .iter()
131        .enumerate()
132        .map(|(i, s)| (s.name.as_str(), i))
133        .collect();
134
135    let mut in_degree = vec![0usize; steps.len()];
136    for (i, step) in steps.iter().enumerate() {
137        for dep in &step.depends_on {
138            if step_index.contains_key(dep.as_str()) {
139                in_degree[i] += 1;
140            }
141        }
142    }
143
144    let mut tiers = Vec::new();
145    let mut remaining = in_degree.clone();
146    let mut completed: Vec<bool> = vec![false; steps.len()];
147
148    loop {
149        let tier: Vec<usize> = (0..steps.len())
150            .filter(|&i| !completed[i] && remaining[i] == 0)
151            .collect();
152
153        if tier.is_empty() {
154            break;
155        }
156
157        for &i in &tier {
158            completed[i] = true;
159        }
160
161        // Decrement in-degrees for dependents of this tier
162        for &i in &tier {
163            for (j, step) in steps.iter().enumerate() {
164                if !completed[j] && step.depends_on.contains(&steps[i].name) {
165                    remaining[j] -= 1;
166                }
167            }
168        }
169
170        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
171    }
172
173    let completed_count: usize = completed.iter().filter(|&&c| c).count();
174    if completed_count != steps.len() {
175        return Err(ZigError::Execution(
176            "could not resolve all steps — possible undetected cycle".into(),
177        ));
178    }
179
180    Ok(tiers)
181}
182
183/// Replace `${var_name}` references in a template with values from the variable map.
184///
185/// Supports dotted paths like `${result.score}` — the root variable name is
186/// looked up, and if its value is valid JSON, the nested path is traversed.
187/// Unknown variables are left as-is.
188fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
189    let mut result = String::with_capacity(template.len());
190    let mut rest = template;
191
192    while let Some(start) = rest.find("${") {
193        result.push_str(&rest[..start]);
194        let after_start = &rest[start + 2..];
195
196        if let Some(end) = after_start.find('}') {
197            let var_expr = &after_start[..end];
198            let mut parts = var_expr.splitn(2, '.');
199            let root = parts.next().unwrap_or(var_expr);
200
201            if let Some(value) = vars.get(root) {
202                if let Some(path) = parts.next() {
203                    // Try to navigate a JSON path
204                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
205                        let resolved = json_path_lookup(&json, path);
206                        result.push_str(&resolved);
207                    } else {
208                        result.push_str(value);
209                    }
210                } else {
211                    result.push_str(value);
212                }
213            } else {
214                // Unknown variable — leave as-is
215                result.push_str(&rest[start..start + 2 + end + 1]);
216            }
217
218            rest = &after_start[end + 1..];
219        } else {
220            result.push_str(&rest[start..]);
221            rest = "";
222        }
223    }
224
225    result.push_str(rest);
226    result
227}
228
229/// Look up a dotted path in a JSON value (e.g., "nested.field").
230fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
231    let mut current = value;
232    for key in path.split('.') {
233        match current.get(key) {
234            Some(v) => current = v,
235            None => return format!("${{?.{path}}}"),
236        }
237    }
238    match current {
239        serde_json::Value::String(s) => s.clone(),
240        other => other.to_string(),
241    }
242}
243
244/// Resolve the effective system prompt for a step, with any advertised
245/// resources prepended as a `<resources>` block.
246///
247/// Resolution order:
248/// 1. If `step.system_prompt` is set, use it (with variable substitution).
249/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
250///    look it up in the roles table, and use the role's system prompt
251///    (loaded from file if `system_prompt_file` is set).
252/// 3. Otherwise the base prompt is empty.
253///
254/// Resources from all configured tiers (global shared, global per-workflow,
255/// project cwd, inline workflow, inline step) are then collected by the
256/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
257/// that is prepended to the base prompt. If both the resources set and the
258/// base prompt are empty, returns `None` — keeping the current behavior when
259/// nothing is configured.
260fn resolve_role_system_prompt(
261    step: &Step,
262    roles: &HashMap<String, Role>,
263    resources: &ResourceCollector,
264    memory: &MemoryCollector,
265    vars: &HashMap<String, String>,
266    workflow_dir: &Path,
267    workflow_name: &str,
268) -> Result<Option<String>, ZigError> {
269    // Resolve the base system prompt (may be empty if neither is set).
270    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
271        Some(substitute_vars(sp, vars))
272    } else if let Some(ref role_ref) = step.role {
273        let resolved_name = substitute_vars(role_ref, vars);
274        let role = roles.get(&resolved_name).ok_or_else(|| {
275            ZigError::Execution(format!(
276                "step '{}' references role '{}' which does not exist",
277                step.name, resolved_name
278            ))
279        })?;
280
281        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
282            let full_path = workflow_dir.join(expand_path(file_path));
283            Some(std::fs::read_to_string(&full_path).map_err(|e| {
284                ZigError::Execution(format!(
285                    "failed to read system_prompt_file '{}' for role '{}': {e}",
286                    full_path.display(),
287                    resolved_name
288                ))
289            })?)
290        } else {
291            role.system_prompt.clone()
292        };
293
294        raw_prompt.map(|p| substitute_vars(&p, vars))
295    } else {
296        None
297    };
298
299    // Collect and render resources.
300    let set = resources.collect_for_step(&step.resources)?;
301    let resource_block = render_system_block(&set);
302
303    // Collect and render memory.
304    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
305    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
306
307    let prefix = format!("{resource_block}{memory_block}");
308
309    match (prefix.is_empty(), base_prompt) {
310        (true, None) => Ok(None),
311        (true, Some(p)) => Ok(Some(p)),
312        (false, None) => Ok(Some(prefix.trim_end().to_string())),
313        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
314    }
315}
316
317/// Load file-backed default values for variables.
318///
319/// For each variable with `default_file` set (and no `default`), reads the
320/// file contents relative to `workflow_dir` and inserts them into the vars map.
321fn load_file_defaults(
322    vars: &mut HashMap<String, String>,
323    declarations: &HashMap<String, crate::workflow::model::Variable>,
324    workflow_dir: &Path,
325) -> Result<(), ZigError> {
326    for (name, decl) in declarations {
327        if decl.default.is_none() {
328            if let Some(ref file_path) = decl.default_file {
329                let full_path = workflow_dir.join(expand_path(file_path));
330                let content = std::fs::read_to_string(&full_path).map_err(|e| {
331                    ZigError::Execution(format!(
332                        "failed to read default_file '{}' for variable '{name}': {e}",
333                        full_path.display()
334                    ))
335                })?;
336                vars.insert(name.clone(), content);
337            }
338        }
339    }
340    Ok(())
341}
342
343/// Evaluate a simple condition expression against the current variable state.
344///
345/// Supports:
346/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
347/// - String equality: `status == "done"`, `status != "pending"`
348/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
349fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
350    let condition = condition.trim();
351
352    // Try comparison operators (ordered by length to match `<=` before `<`)
353    let operators = ["<=", ">=", "!=", "==", "<", ">"];
354    for op in &operators {
355        if let Some(pos) = condition.find(op) {
356            let lhs = resolve_operand(condition[..pos].trim(), vars);
357            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
358            return Ok(compare(&lhs, &rhs, op));
359        }
360    }
361
362    // Truthy check: single variable name
363    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
364    Ok(is_truthy(value))
365}
366
367/// Resolve a condition operand to its string value.
368/// - String literals ("done") → done
369/// - Variable names → looked up in vars
370/// - Numeric literals → left as-is
371fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
372    // Strip surrounding quotes for string literals
373    if (token.starts_with('"') && token.ends_with('"'))
374        || (token.starts_with('\'') && token.ends_with('\''))
375    {
376        return token[1..token.len() - 1].to_string();
377    }
378    // Try variable lookup
379    if let Some(val) = vars.get(token) {
380        return val.clone();
381    }
382    // Return as-is (numeric literal or unknown)
383    token.to_string()
384}
385
386/// Compare two string operands with the given operator.
387/// Attempts numeric comparison first, falls back to lexicographic.
388fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
389    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
390        return match op {
391            "==" => (l - r).abs() < f64::EPSILON,
392            "!=" => (l - r).abs() >= f64::EPSILON,
393            "<" => l < r,
394            ">" => l > r,
395            "<=" => l <= r,
396            ">=" => l >= r,
397            _ => false,
398        };
399    }
400    match op {
401        "==" => lhs == rhs,
402        "!=" => lhs != rhs,
403        "<" => lhs < rhs,
404        ">" => lhs > rhs,
405        "<=" => lhs <= rhs,
406        ">=" => lhs >= rhs,
407        _ => false,
408    }
409}
410
411/// Check if a string value is truthy.
412fn is_truthy(value: &str) -> bool {
413    !value.is_empty() && value != "false" && value != "0"
414}
415
416/// Build the final prompt for a step, incorporating variable substitution,
417/// dependency outputs, and the user's context prompt.
418fn render_step_prompt(
419    step: &Step,
420    vars: &HashMap<String, String>,
421    user_prompt: Option<&str>,
422    dependency_outputs: &HashMap<String, String>,
423) -> String {
424    let mut prompt = String::new();
425
426    // Prepend user context if provided
427    if let Some(ctx) = user_prompt {
428        prompt.push_str(&format!("User context: {ctx}\n\n"));
429    }
430
431    // Inject dependency outputs if requested
432    if step.inject_context {
433        for dep in &step.depends_on {
434            if let Some(output) = dependency_outputs.get(dep) {
435                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
436            }
437        }
438    }
439
440    // Append the step's prompt with variable substitution
441    prompt.push_str(&substitute_vars(&step.prompt, vars));
442
443    prompt
444}
445
446/// Build the argument list for a zag invocation.
447///
448/// Separated from `execute_step` to allow unit testing of flag logic
449/// without a real `zag` binary. The optional `model_override` is used
450/// during retries when `retry_model` escalates to a different model.
451///
452/// The subcommand is determined by `step.command`:
453/// - `None` → `zag run <prompt>` (default)
454/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
455/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
456/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
457/// - `Collect` → `zag collect <session-ids...>`
458/// - `Summary` → `zag summary <session-ids...>`
459fn build_zag_args(
460    step: &Step,
461    prompt: &str,
462    workflow_name: &str,
463    model_override: Option<&str>,
464    rendered_system_prompt: Option<&str>,
465    workflow_provider: Option<&str>,
466    workflow_model: Option<&str>,
467) -> Vec<String> {
468    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
469
470    // Build command-specific prefix and determine if agent args apply
471    let (mut args, accepts_agent_args) = match &step.command {
472        None => (vec!["run".to_string(), prompt.to_string()], true),
473        Some(StepCommand::Review) => {
474            let mut a = vec!["review".to_string()];
475            if !prompt.is_empty() {
476                a.push(prompt.to_string());
477            }
478            if step.uncommitted {
479                a.push("--uncommitted".into());
480            }
481            if let Some(base) = &step.base {
482                a.extend(["--base".into(), base.clone()]);
483            }
484            if let Some(commit) = &step.commit {
485                a.extend(["--commit".into(), commit.clone()]);
486            }
487            if let Some(title) = &step.title {
488                a.extend(["--title".into(), title.clone()]);
489            }
490            (a, true)
491        }
492        Some(StepCommand::Plan) => {
493            let mut a = vec!["plan".to_string(), prompt.to_string()];
494            if let Some(output) = &step.plan_output {
495                a.extend(["-o".into(), expand_path(output)]);
496            }
497            if let Some(instructions) = &step.instructions {
498                a.extend(["--instructions".into(), instructions.clone()]);
499            }
500            (a, true)
501        }
502        Some(StepCommand::Pipe) => {
503            let mut a = vec!["pipe".to_string()];
504            for dep in &step.depends_on {
505                a.push(session_name(dep));
506            }
507            a.push("--".into());
508            a.push(prompt.to_string());
509            (a, true)
510        }
511        Some(StepCommand::Collect) => {
512            let mut a = vec!["collect".to_string()];
513            for dep in &step.depends_on {
514                a.push(session_name(dep));
515            }
516            (a, false)
517        }
518        Some(StepCommand::Summary) => {
519            let mut a = vec!["summary".to_string()];
520            for dep in &step.depends_on {
521                a.push(session_name(dep));
522            }
523            (a, false)
524        }
525    };
526
527    // Agent args (provider, model, prompts, output, etc.) only apply to
528    // commands that launch an agent: run, review, plan, pipe.
529    if accepts_agent_args {
530        let effective_provider = step.provider.as_deref().or(workflow_provider);
531        if let Some(provider) = effective_provider {
532            args.extend(["--provider".into(), provider.to_string()]);
533        }
534
535        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
536        if let Some(model) = effective_model {
537            args.extend(["--model".into(), model.to_string()]);
538        }
539
540        if let Some(sp) = rendered_system_prompt {
541            args.extend(["--system-prompt".into(), sp.to_string()]);
542        }
543        if let Some(max_turns) = step.max_turns {
544            args.extend(["--max-turns".into(), max_turns.to_string()]);
545        }
546
547        // Output format: explicit format overrides the json bool
548        if let Some(output) = &step.output {
549            args.extend(["-o".into(), output.clone()]);
550        } else if step.json {
551            args.push("--json".into());
552        }
553        if let Some(schema) = &step.json_schema {
554            args.extend(["--json-schema".into(), schema.clone()]);
555        }
556
557        if let Some(mcp_config) = &step.mcp_config {
558            args.extend(["--mcp-config".into(), expand_path(mcp_config)]);
559        }
560
561        // Execution environment
562        if step.auto_approve {
563            args.push("--auto-approve".into());
564        }
565        if let Some(root) = &step.root {
566            args.extend(["--root".into(), expand_path(root)]);
567        }
568        for dir in &step.add_dirs {
569            args.extend(["--add-dir".into(), expand_path(dir)]);
570        }
571        for (key, value) in &step.env {
572            args.extend(["--env".into(), format!("{key}={value}")]);
573        }
574        for file in &step.files {
575            args.extend(["--file".into(), expand_path(file)]);
576        }
577
578        // Context injection
579        for ctx in &step.context {
580            args.extend(["--context".into(), ctx.clone()]);
581        }
582        if let Some(plan) = &step.plan {
583            args.extend(["--plan".into(), expand_path(plan)]);
584        }
585
586        // Isolation
587        if step.worktree {
588            args.push("--worktree".into());
589        }
590        if let Some(sandbox) = &step.sandbox {
591            args.extend(["--sandbox".into(), sandbox.clone()]);
592        }
593    }
594
595    // Session metadata applies to all commands
596    let name = session_name(&step.name);
597    args.extend(["--name".into(), name]);
598
599    if !step.description.is_empty() {
600        args.extend(["--description".into(), step.description.clone()]);
601    }
602
603    args.extend(["--tag".into(), "zig-workflow".into()]);
604    for tag in &step.tags {
605        args.extend(["--tag".into(), tag.clone()]);
606    }
607
608    if let Some(timeout) = &step.timeout {
609        args.extend(["--timeout".into(), timeout.clone()]);
610    }
611
612    args
613}
614
615/// Spawn `zag` and stream its stdout/stderr live to our stderr while
616/// also accumulating stdout into a buffer for `saves` extraction.
617///
618/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
619/// — used to disambiguate output from steps running in parallel.
620fn run_zag_streaming(
621    args: &[String],
622    step_name: &str,
623    prefix: Option<&str>,
624    session: Option<&Arc<SessionWriter>>,
625) -> Result<(std::process::ExitStatus, String), ZigError> {
626    let mut cmd = Command::new("zag");
627    cmd.args(args)
628        .stdout(std::process::Stdio::piped())
629        .stderr(std::process::Stdio::piped());
630
631    let mut child = cmd
632        .spawn()
633        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
634
635    let stdout = child.stdout.take().expect("stdout was piped");
636    let stderr = child.stderr.take().expect("stderr was piped");
637
638    let buffer = Arc::new(Mutex::new(String::new()));
639    let buffer_clone = Arc::clone(&buffer);
640    let prefix_stdout = prefix.map(String::from);
641    let prefix_stderr = prefix.map(String::from);
642    let session_stdout = session.cloned();
643    let session_stderr = session.cloned();
644    let step_name_stdout = step_name.to_string();
645    let step_name_stderr = step_name.to_string();
646
647    let stdout_thread = thread::spawn(move || {
648        let reader = BufReader::new(stdout);
649        let stderr_handle = std::io::stderr();
650        for line in reader.lines().map_while(Result::ok) {
651            if let Ok(mut buf) = buffer_clone.lock() {
652                buf.push_str(&line);
653                buf.push('\n');
654            }
655            if let Some(w) = &session_stdout {
656                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
657            }
658            let mut h = stderr_handle.lock();
659            let _ = match &prefix_stdout {
660                Some(p) => writeln!(h, "[{p}] {line}"),
661                None => writeln!(h, "{line}"),
662            };
663        }
664    });
665
666    let stderr_thread = thread::spawn(move || {
667        let reader = BufReader::new(stderr);
668        let stderr_handle = std::io::stderr();
669        for line in reader.lines().map_while(Result::ok) {
670            if let Some(w) = &session_stderr {
671                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
672            }
673            let mut h = stderr_handle.lock();
674            let _ = match &prefix_stderr {
675                Some(p) => writeln!(h, "[{p}] {line}"),
676                None => writeln!(h, "{line}"),
677            };
678        }
679    });
680
681    let status = child
682        .wait()
683        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
684
685    let _ = stdout_thread.join();
686    let _ = stderr_thread.join();
687
688    let captured = Arc::try_unwrap(buffer)
689        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
690        .into_inner()
691        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
692
693    Ok((status, captured))
694}
695
696/// Execute a single step by invoking `zag`, streaming its output live.
697///
698/// Returns the captured stdout from zag. The optional `model_override`
699/// is used during retries to escalate to a different model. The optional
700/// `prefix` tags streamed lines with the step name (used for parallel runs).
701#[allow(clippy::too_many_arguments)]
702fn execute_step(
703    step: &Step,
704    prompt: &str,
705    workflow_name: &str,
706    model_override: Option<&str>,
707    prefix: Option<&str>,
708    session: Option<&Arc<SessionWriter>>,
709    rendered_system_prompt: Option<&str>,
710    workflow_provider: Option<&str>,
711    workflow_model: Option<&str>,
712) -> Result<String, ZigError> {
713    let args = build_zag_args(
714        step,
715        prompt,
716        workflow_name,
717        model_override,
718        rendered_system_prompt,
719        workflow_provider,
720        workflow_model,
721    );
722    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
723
724    if !status.success() {
725        return Err(ZigError::Execution(format!(
726            "step '{}' failed (exit {})",
727            step.name, status,
728        )));
729    }
730
731    Ok(stdout)
732}
733
734/// Run a step with retry logic, returning its captured stdout on success.
735///
736/// Extracted so both sequential and parallel execution paths share the
737/// same retry / model-escalation behavior.
738#[allow(clippy::too_many_arguments)]
739fn run_step_attempts(
740    step: &Step,
741    prompt: &str,
742    workflow_name: &str,
743    prefix: Option<&str>,
744    session: Option<&Arc<SessionWriter>>,
745    rendered_system_prompt: Option<&str>,
746    workflow_provider: Option<&str>,
747    workflow_model: Option<&str>,
748) -> Result<String, ZigError> {
749    let mut attempts = 0;
750    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
751        step.max_retries.unwrap_or(1) + 1
752    } else {
753        1
754    };
755
756    loop {
757        attempts += 1;
758        let model_override = if attempts > 1 {
759            step.retry_model.as_deref()
760        } else {
761            None
762        };
763        match execute_step(
764            step,
765            prompt,
766            workflow_name,
767            model_override,
768            prefix,
769            session,
770            rendered_system_prompt,
771            workflow_provider,
772            workflow_model,
773        ) {
774            Ok(output) => return Ok(output),
775            Err(e) => {
776                if let Some(w) = session {
777                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
778                }
779                if attempts < max_attempts {
780                    eprintln!(
781                        "    retry {}/{} for step '{}'",
782                        attempts,
783                        max_attempts - 1,
784                        step.name
785                    );
786                    continue;
787                }
788                return Err(e);
789            }
790        }
791    }
792}
793
794/// Extract variable values from step output using `saves` selectors.
795///
796/// Selectors:
797/// - `"$"` — the entire output
798/// - `"$.field"` — a top-level JSON field
799/// - `"$.nested.field"` — a nested JSON field
800fn extract_saves(
801    output: &str,
802    saves: &HashMap<String, String>,
803) -> Result<HashMap<String, String>, ZigError> {
804    let mut extracted = HashMap::new();
805
806    for (var_name, selector) in saves {
807        let value = if selector == "$" {
808            output.trim().to_string()
809        } else if let Some(path) = selector.strip_prefix("$.") {
810            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
811                ZigError::Execution(format!(
812                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
813                ))
814            })?;
815            json_path_lookup(&json, path)
816        } else {
817            output.trim().to_string()
818        };
819
820        extracted.insert(var_name.clone(), value);
821    }
822
823    Ok(extracted)
824}
825
826/// Partition a tier of steps into sequential steps and race groups.
827///
828/// Steps without a `race_group` are returned as sequential. Steps sharing
829/// the same `race_group` value are grouped together for parallel execution.
830fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
831    let mut sequential = Vec::new();
832    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
833
834    for step in tier {
835        if let Some(group) = &step.race_group {
836            race_groups.entry(group.clone()).or_default().push(step);
837        } else {
838            sequential.push(*step);
839        }
840    }
841
842    (sequential, race_groups)
843}
844
845/// Spawn a step as a child process without waiting for it to finish.
846fn spawn_step(
847    step: &Step,
848    prompt: &str,
849    workflow_name: &str,
850    rendered_system_prompt: Option<&str>,
851    workflow_provider: Option<&str>,
852    workflow_model: Option<&str>,
853) -> Result<std::process::Child, ZigError> {
854    let args = build_zag_args(
855        step,
856        prompt,
857        workflow_name,
858        None,
859        rendered_system_prompt,
860        workflow_provider,
861        workflow_model,
862    );
863    let mut cmd = Command::new("zag");
864    cmd.args(&args)
865        .stdout(std::process::Stdio::piped())
866        .stderr(std::process::Stdio::piped());
867
868    cmd.spawn()
869        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
870}
871
872/// Execute a race group: run all steps in parallel, return the first winner.
873///
874/// When one step finishes successfully, all remaining steps are killed.
875/// Returns the winning step's name and its stdout output.
876#[allow(clippy::too_many_arguments)]
877fn execute_race_group(
878    steps: &[&Step],
879    prompts: &HashMap<String, String>,
880    system_prompts: &HashMap<String, String>,
881    workflow_name: &str,
882    tier_index: usize,
883    session: Option<&Arc<SessionWriter>>,
884    workflow_provider: Option<&str>,
885    workflow_model: Option<&str>,
886) -> Result<(String, String), ZigError> {
887    if let Some(w) = session {
888        for step in steps {
889            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
890            let preview = prompts
891                .get(&step.name)
892                .map(|p| prompt_preview(p))
893                .unwrap_or_default();
894            let _ = w.step_started(
895                &step.name,
896                tier_index,
897                &zag_session_id,
898                zag_command_name(&step.command),
899                step.model.as_deref(),
900                &preview,
901            );
902        }
903    }
904    let race_started = Instant::now();
905    let mut children: Vec<(String, std::process::Child)> = Vec::new();
906
907    for step in steps {
908        let prompt = prompts.get(&step.name).ok_or_else(|| {
909            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
910        })?;
911        eprintln!("  racing step '{}'...", step.name);
912        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
913        let child = spawn_step(
914            step,
915            prompt,
916            workflow_name,
917            rendered_sp,
918            workflow_provider,
919            workflow_model,
920        )?;
921        children.push((step.name.clone(), child));
922    }
923
924    // Poll until one finishes
925    loop {
926        for i in 0..children.len() {
927            let status = children[i]
928                .1
929                .try_wait()
930                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
931
932            if let Some(exit_status) = status {
933                let (winner_name, winner_child) = children.remove(i);
934
935                // Kill remaining children
936                for (name, mut child) in children {
937                    eprintln!("  cancelling step '{name}' (race lost)");
938                    let _ = child.kill();
939                    let _ = child.wait();
940                }
941
942                let elapsed = race_started.elapsed().as_millis() as u64;
943                if !exit_status.success() {
944                    let stderr = winner_child
945                        .stderr
946                        .map(|mut s| {
947                            let mut buf = String::new();
948                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
949                            buf
950                        })
951                        .unwrap_or_default();
952                    let err_msg = format!(
953                        "race winner '{}' failed (exit {}): {}",
954                        winner_name,
955                        exit_status,
956                        stderr.trim()
957                    );
958                    if let Some(w) = session {
959                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
960                    }
961                    return Err(ZigError::Execution(err_msg));
962                }
963
964                let stdout = winner_child
965                    .stdout
966                    .map(|mut s| {
967                        let mut buf = String::new();
968                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
969                        buf
970                    })
971                    .unwrap_or_default();
972
973                eprintln!("  race won by '{winner_name}'");
974                if let Some(w) = session {
975                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
976                }
977                return Ok((winner_name, stdout));
978            }
979        }
980
981        std::thread::sleep(Duration::from_millis(100));
982    }
983}
984
985/// Execute a single sequential step with retry logic, saves, and next-jump handling.
986#[allow(clippy::too_many_arguments)]
987fn execute_sequential_step(
988    step: &Step,
989    vars: &mut HashMap<String, String>,
990    user_prompt: Option<&str>,
991    step_outputs: &mut HashMap<String, String>,
992    workflow_name: &str,
993    pending_next: &mut Option<String>,
994    tier_index: usize,
995    session: Option<&Arc<SessionWriter>>,
996    roles: &HashMap<String, Role>,
997    resources: &ResourceCollector,
998    memory: &MemoryCollector,
999    workflow_dir: &Path,
1000    workflow_provider: Option<&str>,
1001    workflow_model: Option<&str>,
1002) -> Result<(), ZigError> {
1003    if let Some(condition) = &step.condition {
1004        if !evaluate_condition(condition, vars)? {
1005            eprintln!(
1006                "  skipping '{}' (condition not met: {condition})",
1007                step.name
1008            );
1009            if let Some(w) = session {
1010                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1011            }
1012            return Ok(());
1013        }
1014    }
1015
1016    eprintln!("  running step '{}'...", step.name);
1017
1018    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1019    let rendered_sp = resolve_role_system_prompt(
1020        step,
1021        roles,
1022        resources,
1023        memory,
1024        vars,
1025        workflow_dir,
1026        workflow_name,
1027    )?;
1028    if let Some(w) = session {
1029        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1030        let _ = w.step_started(
1031            &step.name,
1032            tier_index,
1033            &zag_session_id,
1034            zag_command_name(&step.command),
1035            step.model.as_deref(),
1036            &prompt_preview(&prompt),
1037        );
1038    }
1039    let started = Instant::now();
1040    let result = run_step_attempts(
1041        step,
1042        &prompt,
1043        workflow_name,
1044        None,
1045        session,
1046        rendered_sp.as_deref(),
1047        workflow_provider,
1048        workflow_model,
1049    );
1050
1051    match result {
1052        Ok(output) => {
1053            let mut saved_keys: Vec<String> = Vec::new();
1054            if !step.saves.is_empty() {
1055                let saved = extract_saves(&output, &step.saves)?;
1056                for (k, v) in &saved {
1057                    eprintln!("    saved {k} = {v}");
1058                    saved_keys.push(k.clone());
1059                }
1060                vars.extend(saved);
1061            }
1062
1063            step_outputs.insert(step.name.clone(), output);
1064            eprintln!("  completed '{}'", step.name);
1065            if let Some(w) = session {
1066                let _ = w.step_completed(
1067                    &step.name,
1068                    0,
1069                    started.elapsed().as_millis() as u64,
1070                    saved_keys,
1071                );
1072            }
1073
1074            if step.next.is_some() {
1075                *pending_next = step.next.clone();
1076            }
1077        }
1078        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1079            FailurePolicy::Fail => return Err(e),
1080            FailurePolicy::Continue => {
1081                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1082            }
1083            FailurePolicy::Retry => {
1084                return Err(e);
1085            }
1086        },
1087    }
1088
1089    Ok(())
1090}
1091
1092/// Run multiple independent steps in a tier concurrently.
1093///
1094/// All non-skipped steps are spawned in their own threads and we wait for
1095/// every one to finish (unlike race groups, which kill losers). Output
1096/// lines from each step are streamed live to stderr, prefixed with the
1097/// step name to disambiguate. After completion, results are processed in
1098/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1099/// remain deterministic.
1100#[allow(clippy::too_many_arguments)]
1101fn execute_parallel_tier(
1102    steps: &[&Step],
1103    vars: &mut HashMap<String, String>,
1104    user_prompt: Option<&str>,
1105    step_outputs: &mut HashMap<String, String>,
1106    workflow_name: &str,
1107    pending_next: &mut Option<String>,
1108    tier_index: usize,
1109    session: Option<&Arc<SessionWriter>>,
1110    roles: &HashMap<String, Role>,
1111    resources: &ResourceCollector,
1112    memory: &MemoryCollector,
1113    workflow_dir: &Path,
1114    workflow_provider: Option<&str>,
1115    workflow_model: Option<&str>,
1116) -> Result<(), ZigError> {
1117    // Evaluate conditions and render prompts up front, so threads receive
1118    // the same variable snapshot they would have under sequential execution.
1119    let mut active: Vec<&Step> = Vec::new();
1120    let mut prompts: HashMap<String, String> = HashMap::new();
1121    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1122    for step in steps {
1123        if let Some(condition) = &step.condition {
1124            if !evaluate_condition(condition, vars)? {
1125                eprintln!(
1126                    "  skipping '{}' (condition not met: {condition})",
1127                    step.name
1128                );
1129                if let Some(w) = session {
1130                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1131                }
1132                continue;
1133            }
1134        }
1135        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1136        prompts.insert(step.name.clone(), prompt);
1137        if let Some(sp) = resolve_role_system_prompt(
1138            step,
1139            roles,
1140            resources,
1141            memory,
1142            vars,
1143            workflow_dir,
1144            workflow_name,
1145        )? {
1146            rendered_sps.insert(step.name.clone(), sp);
1147        }
1148        active.push(*step);
1149    }
1150
1151    if active.is_empty() {
1152        return Ok(());
1153    }
1154
1155    eprintln!("  running {} steps in parallel...", active.len());
1156
1157    let mut start_times: HashMap<String, Instant> = HashMap::new();
1158    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1159    for step in &active {
1160        let step_clone: Step = (*step).clone();
1161        let prompt = prompts.remove(&step.name).unwrap_or_default();
1162        let rendered_sp = rendered_sps.remove(&step.name);
1163        let workflow_name_owned = workflow_name.to_string();
1164        let name = step.name.clone();
1165        eprintln!("  starting '{name}'...");
1166        if let Some(w) = session {
1167            let zag_session_id = format!("zig-{workflow_name}-{name}");
1168            let _ = w.step_started(
1169                &name,
1170                tier_index,
1171                &zag_session_id,
1172                zag_command_name(&step.command),
1173                step.model.as_deref(),
1174                &prompt_preview(&prompt),
1175            );
1176        }
1177        start_times.insert(name.clone(), Instant::now());
1178        let session_clone = session.cloned();
1179        let wf_provider = workflow_provider.map(String::from);
1180        let wf_model = workflow_model.map(String::from);
1181        let handle = thread::spawn(move || {
1182            let res = run_step_attempts(
1183                &step_clone,
1184                &prompt,
1185                &workflow_name_owned,
1186                Some(&name),
1187                session_clone.as_ref(),
1188                rendered_sp.as_deref(),
1189                wf_provider.as_deref(),
1190                wf_model.as_deref(),
1191            );
1192            (name, res)
1193        });
1194        handles.push(handle);
1195    }
1196
1197    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1198    for handle in handles {
1199        match handle.join() {
1200            Ok((name, res)) => {
1201                results.insert(name, res);
1202            }
1203            Err(_) => {
1204                return Err(ZigError::Execution("parallel step thread panicked".into()));
1205            }
1206        }
1207    }
1208
1209    // Process results in tier-declaration order so `next` is deterministic.
1210    let mut errors: Vec<String> = Vec::new();
1211    for step in &active {
1212        let Some(res) = results.remove(&step.name) else {
1213            continue;
1214        };
1215        let elapsed = start_times
1216            .remove(&step.name)
1217            .map(|t| t.elapsed().as_millis() as u64)
1218            .unwrap_or(0);
1219        match res {
1220            Ok(output) => {
1221                let mut saved_keys: Vec<String> = Vec::new();
1222                if !step.saves.is_empty() {
1223                    let saved = extract_saves(&output, &step.saves)?;
1224                    for (k, v) in &saved {
1225                        eprintln!("    saved {k} = {v}");
1226                        saved_keys.push(k.clone());
1227                    }
1228                    vars.extend(saved);
1229                }
1230                step_outputs.insert(step.name.clone(), output);
1231                eprintln!("  completed '{}'", step.name);
1232                if let Some(w) = session {
1233                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1234                }
1235                if step.next.is_some() && pending_next.is_none() {
1236                    *pending_next = step.next.clone();
1237                }
1238            }
1239            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1240                FailurePolicy::Continue => {
1241                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1242                }
1243                FailurePolicy::Fail | FailurePolicy::Retry => {
1244                    errors.push(format!("'{}': {e}", step.name));
1245                }
1246            },
1247        }
1248    }
1249
1250    if !errors.is_empty() {
1251        return Err(ZigError::Execution(format!(
1252            "parallel step(s) failed: {}",
1253            errors.join("; ")
1254        )));
1255    }
1256
1257    Ok(())
1258}
1259
1260/// Initialize the variable map from workflow variable definitions.
1261/// Variables with defaults are set to their default value; others are empty.
1262fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1263    let mut vars = HashMap::new();
1264    for (name, var) in &workflow.vars {
1265        let value = match &var.default {
1266            Some(toml::Value::String(s)) => s.clone(),
1267            Some(toml::Value::Integer(n)) => n.to_string(),
1268            Some(toml::Value::Float(f)) => f.to_string(),
1269            Some(toml::Value::Boolean(b)) => b.to_string(),
1270            Some(other) => other.to_string(),
1271            None => String::new(),
1272        };
1273        vars.insert(name.clone(), value);
1274    }
1275    vars
1276}
1277
1278/// Main execution loop for a validated workflow.
1279fn execute(
1280    workflow: &Workflow,
1281    workflow_path: &std::path::Path,
1282    user_prompt: Option<&str>,
1283    workflow_dir: &Path,
1284    disable_resources: bool,
1285    disable_memory: bool,
1286) -> Result<(), ZigError> {
1287    let mut vars = init_vars(workflow);
1288
1289    let resource_collector = ResourceCollector::from_env(
1290        &workflow.workflow.name,
1291        &workflow.workflow.resources,
1292        workflow_dir,
1293        disable_resources,
1294    );
1295
1296    let config = ZigConfig::load();
1297    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1298    let memory_collector = MemoryCollector::from_env(
1299        &workflow.workflow.name,
1300        workflow_memory_mode,
1301        &config,
1302        disable_memory,
1303    );
1304
1305    // Load file-backed variable defaults before prompt binding.
1306    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1307
1308    // Bind user prompt to the variable with `from = "prompt"`, if any.
1309    let prompt_var = workflow
1310        .vars
1311        .iter()
1312        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1313        .map(|(name, _)| name.clone());
1314
1315    if let Some(ref var_name) = prompt_var {
1316        if let Some(prompt) = user_prompt {
1317            vars.insert(var_name.clone(), prompt.to_string());
1318        }
1319    }
1320
1321    // Validate variable values against constraints before executing.
1322    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1323        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1324        return Err(ZigError::Validation(msgs.join("; ")));
1325    }
1326
1327    // When prompt is bound to a variable, don't also prepend "User context:".
1328    let effective_user_prompt = if prompt_var.is_some() {
1329        None
1330    } else {
1331        user_prompt
1332    };
1333
1334    let mut step_outputs: HashMap<String, String> = HashMap::new();
1335
1336    let wf_provider = workflow.workflow.provider.as_deref();
1337    let wf_model = workflow.workflow.model.as_deref();
1338
1339    let tiers = topological_sort(&workflow.steps)?;
1340
1341    eprintln!(
1342        "running workflow '{}' ({} steps in {} tiers)",
1343        workflow.workflow.name,
1344        workflow.steps.len(),
1345        tiers.len()
1346    );
1347
1348    // Open a zig session log for this run. Failure to open the log is
1349    // non-fatal — `zig run` should still execute even if `~/.zig` is
1350    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1351    // downstream so the writer-less path stays intact for tests.
1352    let coordinator = match SessionWriter::create(
1353        &workflow.workflow.name,
1354        &workflow_path.to_string_lossy(),
1355        user_prompt,
1356        tiers.len(),
1357    ) {
1358        Ok(writer) => {
1359            eprintln!("zig session: {}", writer.session_id());
1360            Some(SessionCoordinator::start(writer))
1361        }
1362        Err(e) => {
1363            eprintln!("warning: failed to open zig session log: {e}");
1364            None
1365        }
1366    };
1367    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1368    let session_ref = session_writer.as_ref();
1369
1370    let mut iteration = 0;
1371    let mut pending_next: Option<String> = None;
1372
1373    loop {
1374        let tiers_to_run = if let Some(ref next_step) = pending_next {
1375            // Re-run from the target step onward
1376            let remaining: Vec<Vec<&Step>> = tiers
1377                .iter()
1378                .map(|tier| {
1379                    tier.iter()
1380                        .filter(|s| s.name == *next_step)
1381                        .copied()
1382                        .collect::<Vec<_>>()
1383                })
1384                .filter(|tier| !tier.is_empty())
1385                .collect();
1386            pending_next = None;
1387            remaining
1388        } else if iteration == 0 {
1389            tiers.clone()
1390        } else {
1391            break;
1392        };
1393
1394        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1395            let (non_race, race_groups) = partition_tier(tier);
1396
1397            if let Some(w) = session_ref {
1398                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1399                let _ = w.tier_started(tier_index, names);
1400            }
1401
1402            // Independent steps in the same tier run concurrently. A single
1403            // step takes the sequential path so its output streams without
1404            // a name prefix; multiple steps go through the parallel path.
1405            if non_race.len() <= 1 {
1406                for step in &non_race {
1407                    execute_sequential_step(
1408                        step,
1409                        &mut vars,
1410                        effective_user_prompt,
1411                        &mut step_outputs,
1412                        &workflow.workflow.name,
1413                        &mut pending_next,
1414                        tier_index,
1415                        session_ref,
1416                        &workflow.roles,
1417                        &resource_collector,
1418                        &memory_collector,
1419                        workflow_dir,
1420                        wf_provider,
1421                        wf_model,
1422                    )?;
1423                }
1424            } else {
1425                execute_parallel_tier(
1426                    &non_race,
1427                    &mut vars,
1428                    effective_user_prompt,
1429                    &mut step_outputs,
1430                    &workflow.workflow.name,
1431                    &mut pending_next,
1432                    tier_index,
1433                    session_ref,
1434                    &workflow.roles,
1435                    &resource_collector,
1436                    &memory_collector,
1437                    workflow_dir,
1438                    wf_provider,
1439                    wf_model,
1440                )?;
1441            }
1442
1443            // Run race groups in parallel
1444            for (group_name, race_steps) in &race_groups {
1445                eprintln!("  starting race group '{group_name}'...");
1446
1447                // Build prompts for all racers (conditions evaluated here)
1448                let mut prompts = HashMap::new();
1449                let mut race_sps: HashMap<String, String> = HashMap::new();
1450                let mut active_steps: Vec<&Step> = Vec::new();
1451                for step in race_steps {
1452                    if let Some(condition) = &step.condition {
1453                        if !evaluate_condition(condition, &vars)? {
1454                            eprintln!(
1455                                "  skipping '{}' (condition not met: {condition})",
1456                                step.name
1457                            );
1458                            continue;
1459                        }
1460                    }
1461                    let prompt =
1462                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1463                    prompts.insert(step.name.clone(), prompt);
1464                    if let Some(sp) = resolve_role_system_prompt(
1465                        step,
1466                        &workflow.roles,
1467                        &resource_collector,
1468                        &memory_collector,
1469                        &vars,
1470                        workflow_dir,
1471                        &workflow.workflow.name,
1472                    )? {
1473                        race_sps.insert(step.name.clone(), sp);
1474                    }
1475                    active_steps.push(step);
1476                }
1477
1478                if active_steps.is_empty() {
1479                    continue;
1480                }
1481
1482                match execute_race_group(
1483                    &active_steps,
1484                    &prompts,
1485                    &race_sps,
1486                    &workflow.workflow.name,
1487                    tier_index,
1488                    session_ref,
1489                    wf_provider,
1490                    wf_model,
1491                ) {
1492                    Ok((winner_name, output)) => {
1493                        // Find the winning step to process saves/next
1494                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1495                            if !winner.saves.is_empty() {
1496                                let saved = extract_saves(&output, &winner.saves)?;
1497                                for (k, v) in &saved {
1498                                    eprintln!("    saved {k} = {v}");
1499                                }
1500                                vars.extend(saved);
1501                            }
1502                            if winner.next.is_some() {
1503                                pending_next = winner.next.clone();
1504                            }
1505                        }
1506                        step_outputs.insert(winner_name.clone(), output);
1507                        eprintln!(
1508                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1509                        );
1510                    }
1511                    Err(e) => return Err(e),
1512                }
1513            }
1514        }
1515
1516        iteration += 1;
1517        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1518            if iteration >= MAX_LOOP_ITERATIONS {
1519                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1520            }
1521            break;
1522        }
1523    }
1524
1525    eprintln!("workflow '{}' completed", workflow.workflow.name);
1526    if let Some(c) = coordinator {
1527        let _ = c.finish(SessionStatus::Success);
1528    }
1529    Ok(())
1530}
1531
1532/// Short label for the zag subcommand a step will invoke. Used in
1533/// `step_started` events so listeners can distinguish run/review/plan/etc.
1534fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1535    match cmd {
1536        None => "run",
1537        Some(StepCommand::Review) => "review",
1538        Some(StepCommand::Plan) => "plan",
1539        Some(StepCommand::Pipe) => "pipe",
1540        Some(StepCommand::Collect) => "collect",
1541        Some(StepCommand::Summary) => "summary",
1542    }
1543}
1544
1545/// Truncated single-line preview of a rendered prompt for the session log.
1546fn prompt_preview(prompt: &str) -> String {
1547    const MAX: usize = 200;
1548    let collapsed: String = prompt
1549        .chars()
1550        .map(|c| if c == '\n' { ' ' } else { c })
1551        .collect();
1552    if collapsed.chars().count() <= MAX {
1553        collapsed
1554    } else {
1555        let truncated: String = collapsed.chars().take(MAX).collect();
1556        format!("{truncated}…")
1557    }
1558}
1559
1560#[cfg(test)]
1561#[path = "run_tests.rs"]
1562mod tests;