Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::config::ZigConfig;
12use crate::error::ZigError;
13use crate::memory::{MemoryCollector, render_memory_block};
14use crate::resources::{ResourceCollector, render_system_block};
15use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
16use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
17use crate::workflow::{parser, validate};
18
19/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
20const MAX_LOOP_ITERATIONS: usize = 100;
21
22/// Execute a workflow file (`.zwf` or `.zwfz`).
23///
24/// Parses the workflow, validates it, resolves the step DAG, and executes
25/// each step by delegating to `zag`. The optional `user_prompt` is injected
26/// as additional context into every step's prompt.
27///
28/// Resource advertisement (the `<resources>` block prepended to each step's
29/// system prompt) is enabled by default; pass `disable_resources = true` to
30/// opt out, e.g. via `zig run --no-resources`.
31///
32/// Memory injection (the `<memory>` block) is similarly enabled by default;
33/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
34pub fn run_workflow(
35    workflow_path: &str,
36    user_prompt: Option<&str>,
37    disable_resources: bool,
38    disable_memory: bool,
39) -> Result<(), ZigError> {
40    check_zag()?;
41
42    let path = resolve_workflow_path(workflow_path)?;
43    let (workflow, source) = parser::parse_workflow(&path)?;
44
45    if let Err(errors) = validate::validate(&workflow) {
46        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
47        return Err(ZigError::Validation(msgs.join("; ")));
48    }
49
50    execute(
51        &workflow,
52        &path,
53        user_prompt,
54        source.dir(),
55        disable_resources,
56        disable_memory,
57    )
58}
59
60/// Verify that `zag` is installed and available on PATH.
61pub(crate) fn check_zag() -> Result<(), ZigError> {
62    let zag_available = Command::new("zag")
63        .arg("--version")
64        .output()
65        .is_ok_and(|o| o.status.success());
66
67    if !zag_available {
68        return Err(ZigError::Zag(
69            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
70        ));
71    }
72    Ok(())
73}
74
75/// Resolve a workflow argument to an actual file path.
76///
77/// Tries in order:
78/// 1. Literal path as given
79/// 2. With `.zwf` extension appended
80/// 3. With `.zwfz` extension appended
81/// 4. Under local project `.zig/workflows/` directory
82/// 5. Under local project `.zig/workflows/` with `.zwf` appended
83/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
84/// 7. Under global `~/.zig/workflows/` directory
85/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
86/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
87pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
88    let mut candidates = vec![
89        PathBuf::from(workflow),
90        PathBuf::from(format!("{workflow}.zwf")),
91        PathBuf::from(format!("{workflow}.zwfz")),
92    ];
93
94    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
95        candidates.push(local_dir.join(workflow));
96        candidates.push(local_dir.join(format!("{workflow}.zwf")));
97        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
98    }
99
100    if let Some(global_dir) = crate::paths::global_workflows_dir() {
101        candidates.push(global_dir.join(workflow));
102        candidates.push(global_dir.join(format!("{workflow}.zwf")));
103        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
104    }
105
106    for candidate in &candidates {
107        if candidate.exists() {
108            return Ok(candidate.clone());
109        }
110    }
111
112    Err(ZigError::Io(format!(
113        "workflow not found: '{workflow}' (tried: {})",
114        candidates
115            .iter()
116            .map(|p| p.display().to_string())
117            .collect::<Vec<_>>()
118            .join(", ")
119    )))
120}
121
122/// Compute a topological ordering of steps grouped into tiers.
123///
124/// Each tier contains steps whose dependencies are all in earlier tiers,
125/// meaning steps within a tier can (in principle) run in parallel.
126/// Uses Kahn's algorithm.
127fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
128    let step_index: HashMap<&str, usize> = steps
129        .iter()
130        .enumerate()
131        .map(|(i, s)| (s.name.as_str(), i))
132        .collect();
133
134    let mut in_degree = vec![0usize; steps.len()];
135    for (i, step) in steps.iter().enumerate() {
136        for dep in &step.depends_on {
137            if step_index.contains_key(dep.as_str()) {
138                in_degree[i] += 1;
139            }
140        }
141    }
142
143    let mut tiers = Vec::new();
144    let mut remaining = in_degree.clone();
145    let mut completed: Vec<bool> = vec![false; steps.len()];
146
147    loop {
148        let tier: Vec<usize> = (0..steps.len())
149            .filter(|&i| !completed[i] && remaining[i] == 0)
150            .collect();
151
152        if tier.is_empty() {
153            break;
154        }
155
156        for &i in &tier {
157            completed[i] = true;
158        }
159
160        // Decrement in-degrees for dependents of this tier
161        for &i in &tier {
162            for (j, step) in steps.iter().enumerate() {
163                if !completed[j] && step.depends_on.contains(&steps[i].name) {
164                    remaining[j] -= 1;
165                }
166            }
167        }
168
169        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
170    }
171
172    let completed_count: usize = completed.iter().filter(|&&c| c).count();
173    if completed_count != steps.len() {
174        return Err(ZigError::Execution(
175            "could not resolve all steps — possible undetected cycle".into(),
176        ));
177    }
178
179    Ok(tiers)
180}
181
182/// Replace `${var_name}` references in a template with values from the variable map.
183///
184/// Supports dotted paths like `${result.score}` — the root variable name is
185/// looked up, and if its value is valid JSON, the nested path is traversed.
186/// Unknown variables are left as-is.
187fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
188    let mut result = String::with_capacity(template.len());
189    let mut rest = template;
190
191    while let Some(start) = rest.find("${") {
192        result.push_str(&rest[..start]);
193        let after_start = &rest[start + 2..];
194
195        if let Some(end) = after_start.find('}') {
196            let var_expr = &after_start[..end];
197            let mut parts = var_expr.splitn(2, '.');
198            let root = parts.next().unwrap_or(var_expr);
199
200            if let Some(value) = vars.get(root) {
201                if let Some(path) = parts.next() {
202                    // Try to navigate a JSON path
203                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
204                        let resolved = json_path_lookup(&json, path);
205                        result.push_str(&resolved);
206                    } else {
207                        result.push_str(value);
208                    }
209                } else {
210                    result.push_str(value);
211                }
212            } else {
213                // Unknown variable — leave as-is
214                result.push_str(&rest[start..start + 2 + end + 1]);
215            }
216
217            rest = &after_start[end + 1..];
218        } else {
219            result.push_str(&rest[start..]);
220            rest = "";
221        }
222    }
223
224    result.push_str(rest);
225    result
226}
227
228/// Look up a dotted path in a JSON value (e.g., "nested.field").
229fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
230    let mut current = value;
231    for key in path.split('.') {
232        match current.get(key) {
233            Some(v) => current = v,
234            None => return format!("${{?.{path}}}"),
235        }
236    }
237    match current {
238        serde_json::Value::String(s) => s.clone(),
239        other => other.to_string(),
240    }
241}
242
243/// Resolve the effective system prompt for a step, with any advertised
244/// resources prepended as a `<resources>` block.
245///
246/// Resolution order:
247/// 1. If `step.system_prompt` is set, use it (with variable substitution).
248/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
249///    look it up in the roles table, and use the role's system prompt
250///    (loaded from file if `system_prompt_file` is set).
251/// 3. Otherwise the base prompt is empty.
252///
253/// Resources from all configured tiers (global shared, global per-workflow,
254/// project cwd, inline workflow, inline step) are then collected by the
255/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
256/// that is prepended to the base prompt. If both the resources set and the
257/// base prompt are empty, returns `None` — keeping the current behavior when
258/// nothing is configured.
259fn resolve_role_system_prompt(
260    step: &Step,
261    roles: &HashMap<String, Role>,
262    resources: &ResourceCollector,
263    memory: &MemoryCollector,
264    vars: &HashMap<String, String>,
265    workflow_dir: &Path,
266    workflow_name: &str,
267) -> Result<Option<String>, ZigError> {
268    // Resolve the base system prompt (may be empty if neither is set).
269    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
270        Some(substitute_vars(sp, vars))
271    } else if let Some(ref role_ref) = step.role {
272        let resolved_name = substitute_vars(role_ref, vars);
273        let role = roles.get(&resolved_name).ok_or_else(|| {
274            ZigError::Execution(format!(
275                "step '{}' references role '{}' which does not exist",
276                step.name, resolved_name
277            ))
278        })?;
279
280        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
281            let full_path = workflow_dir.join(file_path);
282            Some(std::fs::read_to_string(&full_path).map_err(|e| {
283                ZigError::Execution(format!(
284                    "failed to read system_prompt_file '{}' for role '{}': {e}",
285                    full_path.display(),
286                    resolved_name
287                ))
288            })?)
289        } else {
290            role.system_prompt.clone()
291        };
292
293        raw_prompt.map(|p| substitute_vars(&p, vars))
294    } else {
295        None
296    };
297
298    // Collect and render resources.
299    let set = resources.collect_for_step(&step.resources)?;
300    let resource_block = render_system_block(&set);
301
302    // Collect and render memory.
303    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
304    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
305
306    let prefix = format!("{resource_block}{memory_block}");
307
308    match (prefix.is_empty(), base_prompt) {
309        (true, None) => Ok(None),
310        (true, Some(p)) => Ok(Some(p)),
311        (false, None) => Ok(Some(prefix.trim_end().to_string())),
312        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
313    }
314}
315
316/// Load file-backed default values for variables.
317///
318/// For each variable with `default_file` set (and no `default`), reads the
319/// file contents relative to `workflow_dir` and inserts them into the vars map.
320fn load_file_defaults(
321    vars: &mut HashMap<String, String>,
322    declarations: &HashMap<String, crate::workflow::model::Variable>,
323    workflow_dir: &Path,
324) -> Result<(), ZigError> {
325    for (name, decl) in declarations {
326        if decl.default.is_none() {
327            if let Some(ref file_path) = decl.default_file {
328                let full_path = workflow_dir.join(file_path);
329                let content = std::fs::read_to_string(&full_path).map_err(|e| {
330                    ZigError::Execution(format!(
331                        "failed to read default_file '{}' for variable '{name}': {e}",
332                        full_path.display()
333                    ))
334                })?;
335                vars.insert(name.clone(), content);
336            }
337        }
338    }
339    Ok(())
340}
341
342/// Evaluate a simple condition expression against the current variable state.
343///
344/// Supports:
345/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
346/// - String equality: `status == "done"`, `status != "pending"`
347/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
348fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
349    let condition = condition.trim();
350
351    // Try comparison operators (ordered by length to match `<=` before `<`)
352    let operators = ["<=", ">=", "!=", "==", "<", ">"];
353    for op in &operators {
354        if let Some(pos) = condition.find(op) {
355            let lhs = resolve_operand(condition[..pos].trim(), vars);
356            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
357            return Ok(compare(&lhs, &rhs, op));
358        }
359    }
360
361    // Truthy check: single variable name
362    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
363    Ok(is_truthy(value))
364}
365
366/// Resolve a condition operand to its string value.
367/// - String literals ("done") → done
368/// - Variable names → looked up in vars
369/// - Numeric literals → left as-is
370fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
371    // Strip surrounding quotes for string literals
372    if (token.starts_with('"') && token.ends_with('"'))
373        || (token.starts_with('\'') && token.ends_with('\''))
374    {
375        return token[1..token.len() - 1].to_string();
376    }
377    // Try variable lookup
378    if let Some(val) = vars.get(token) {
379        return val.clone();
380    }
381    // Return as-is (numeric literal or unknown)
382    token.to_string()
383}
384
385/// Compare two string operands with the given operator.
386/// Attempts numeric comparison first, falls back to lexicographic.
387fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
388    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
389        return match op {
390            "==" => (l - r).abs() < f64::EPSILON,
391            "!=" => (l - r).abs() >= f64::EPSILON,
392            "<" => l < r,
393            ">" => l > r,
394            "<=" => l <= r,
395            ">=" => l >= r,
396            _ => false,
397        };
398    }
399    match op {
400        "==" => lhs == rhs,
401        "!=" => lhs != rhs,
402        "<" => lhs < rhs,
403        ">" => lhs > rhs,
404        "<=" => lhs <= rhs,
405        ">=" => lhs >= rhs,
406        _ => false,
407    }
408}
409
410/// Check if a string value is truthy.
411fn is_truthy(value: &str) -> bool {
412    !value.is_empty() && value != "false" && value != "0"
413}
414
415/// Build the final prompt for a step, incorporating variable substitution,
416/// dependency outputs, and the user's context prompt.
417fn render_step_prompt(
418    step: &Step,
419    vars: &HashMap<String, String>,
420    user_prompt: Option<&str>,
421    dependency_outputs: &HashMap<String, String>,
422) -> String {
423    let mut prompt = String::new();
424
425    // Prepend user context if provided
426    if let Some(ctx) = user_prompt {
427        prompt.push_str(&format!("User context: {ctx}\n\n"));
428    }
429
430    // Inject dependency outputs if requested
431    if step.inject_context {
432        for dep in &step.depends_on {
433            if let Some(output) = dependency_outputs.get(dep) {
434                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
435            }
436        }
437    }
438
439    // Append the step's prompt with variable substitution
440    prompt.push_str(&substitute_vars(&step.prompt, vars));
441
442    prompt
443}
444
445/// Build the argument list for a zag invocation.
446///
447/// Separated from `execute_step` to allow unit testing of flag logic
448/// without a real `zag` binary. The optional `model_override` is used
449/// during retries when `retry_model` escalates to a different model.
450///
451/// The subcommand is determined by `step.command`:
452/// - `None` → `zag run <prompt>` (default)
453/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
454/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
455/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
456/// - `Collect` → `zag collect <session-ids...>`
457/// - `Summary` → `zag summary <session-ids...>`
458fn build_zag_args(
459    step: &Step,
460    prompt: &str,
461    workflow_name: &str,
462    model_override: Option<&str>,
463    rendered_system_prompt: Option<&str>,
464    workflow_provider: Option<&str>,
465    workflow_model: Option<&str>,
466) -> Vec<String> {
467    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
468
469    // Build command-specific prefix and determine if agent args apply
470    let (mut args, accepts_agent_args) = match &step.command {
471        None => (vec!["run".to_string(), prompt.to_string()], true),
472        Some(StepCommand::Review) => {
473            let mut a = vec!["review".to_string()];
474            if !prompt.is_empty() {
475                a.push(prompt.to_string());
476            }
477            if step.uncommitted {
478                a.push("--uncommitted".into());
479            }
480            if let Some(base) = &step.base {
481                a.extend(["--base".into(), base.clone()]);
482            }
483            if let Some(commit) = &step.commit {
484                a.extend(["--commit".into(), commit.clone()]);
485            }
486            if let Some(title) = &step.title {
487                a.extend(["--title".into(), title.clone()]);
488            }
489            (a, true)
490        }
491        Some(StepCommand::Plan) => {
492            let mut a = vec!["plan".to_string(), prompt.to_string()];
493            if let Some(output) = &step.plan_output {
494                a.extend(["-o".into(), output.clone()]);
495            }
496            if let Some(instructions) = &step.instructions {
497                a.extend(["--instructions".into(), instructions.clone()]);
498            }
499            (a, true)
500        }
501        Some(StepCommand::Pipe) => {
502            let mut a = vec!["pipe".to_string()];
503            for dep in &step.depends_on {
504                a.push(session_name(dep));
505            }
506            a.push("--".into());
507            a.push(prompt.to_string());
508            (a, true)
509        }
510        Some(StepCommand::Collect) => {
511            let mut a = vec!["collect".to_string()];
512            for dep in &step.depends_on {
513                a.push(session_name(dep));
514            }
515            (a, false)
516        }
517        Some(StepCommand::Summary) => {
518            let mut a = vec!["summary".to_string()];
519            for dep in &step.depends_on {
520                a.push(session_name(dep));
521            }
522            (a, false)
523        }
524    };
525
526    // Agent args (provider, model, prompts, output, etc.) only apply to
527    // commands that launch an agent: run, review, plan, pipe.
528    if accepts_agent_args {
529        let effective_provider = step.provider.as_deref().or(workflow_provider);
530        if let Some(provider) = effective_provider {
531            args.extend(["--provider".into(), provider.to_string()]);
532        }
533
534        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
535        if let Some(model) = effective_model {
536            args.extend(["--model".into(), model.to_string()]);
537        }
538
539        if let Some(sp) = rendered_system_prompt {
540            args.extend(["--system-prompt".into(), sp.to_string()]);
541        }
542        if let Some(max_turns) = step.max_turns {
543            args.extend(["--max-turns".into(), max_turns.to_string()]);
544        }
545
546        // Output format: explicit format overrides the json bool
547        if let Some(output) = &step.output {
548            args.extend(["-o".into(), output.clone()]);
549        } else if step.json {
550            args.push("--json".into());
551        }
552        if let Some(schema) = &step.json_schema {
553            args.extend(["--json-schema".into(), schema.clone()]);
554        }
555
556        if let Some(mcp_config) = &step.mcp_config {
557            args.extend(["--mcp-config".into(), mcp_config.clone()]);
558        }
559
560        // Execution environment
561        if step.auto_approve {
562            args.push("--auto-approve".into());
563        }
564        if let Some(root) = &step.root {
565            args.extend(["--root".into(), root.clone()]);
566        }
567        for dir in &step.add_dirs {
568            args.extend(["--add-dir".into(), dir.clone()]);
569        }
570        for (key, value) in &step.env {
571            args.extend(["--env".into(), format!("{key}={value}")]);
572        }
573        for file in &step.files {
574            args.extend(["--file".into(), file.clone()]);
575        }
576
577        // Context injection
578        for ctx in &step.context {
579            args.extend(["--context".into(), ctx.clone()]);
580        }
581        if let Some(plan) = &step.plan {
582            args.extend(["--plan".into(), plan.clone()]);
583        }
584
585        // Isolation
586        if step.worktree {
587            args.push("--worktree".into());
588        }
589        if let Some(sandbox) = &step.sandbox {
590            args.extend(["--sandbox".into(), sandbox.clone()]);
591        }
592    }
593
594    // Session metadata applies to all commands
595    let name = session_name(&step.name);
596    args.extend(["--name".into(), name]);
597
598    if !step.description.is_empty() {
599        args.extend(["--description".into(), step.description.clone()]);
600    }
601
602    args.extend(["--tag".into(), "zig-workflow".into()]);
603    for tag in &step.tags {
604        args.extend(["--tag".into(), tag.clone()]);
605    }
606
607    if let Some(timeout) = &step.timeout {
608        args.extend(["--timeout".into(), timeout.clone()]);
609    }
610
611    args
612}
613
614/// Spawn `zag` and stream its stdout/stderr live to our stderr while
615/// also accumulating stdout into a buffer for `saves` extraction.
616///
617/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
618/// — used to disambiguate output from steps running in parallel.
619fn run_zag_streaming(
620    args: &[String],
621    step_name: &str,
622    prefix: Option<&str>,
623    session: Option<&Arc<SessionWriter>>,
624) -> Result<(std::process::ExitStatus, String), ZigError> {
625    let mut cmd = Command::new("zag");
626    cmd.args(args)
627        .stdout(std::process::Stdio::piped())
628        .stderr(std::process::Stdio::piped());
629
630    let mut child = cmd
631        .spawn()
632        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
633
634    let stdout = child.stdout.take().expect("stdout was piped");
635    let stderr = child.stderr.take().expect("stderr was piped");
636
637    let buffer = Arc::new(Mutex::new(String::new()));
638    let buffer_clone = Arc::clone(&buffer);
639    let prefix_stdout = prefix.map(String::from);
640    let prefix_stderr = prefix.map(String::from);
641    let session_stdout = session.cloned();
642    let session_stderr = session.cloned();
643    let step_name_stdout = step_name.to_string();
644    let step_name_stderr = step_name.to_string();
645
646    let stdout_thread = thread::spawn(move || {
647        let reader = BufReader::new(stdout);
648        let stderr_handle = std::io::stderr();
649        for line in reader.lines().map_while(Result::ok) {
650            if let Ok(mut buf) = buffer_clone.lock() {
651                buf.push_str(&line);
652                buf.push('\n');
653            }
654            if let Some(w) = &session_stdout {
655                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
656            }
657            let mut h = stderr_handle.lock();
658            let _ = match &prefix_stdout {
659                Some(p) => writeln!(h, "[{p}] {line}"),
660                None => writeln!(h, "{line}"),
661            };
662        }
663    });
664
665    let stderr_thread = thread::spawn(move || {
666        let reader = BufReader::new(stderr);
667        let stderr_handle = std::io::stderr();
668        for line in reader.lines().map_while(Result::ok) {
669            if let Some(w) = &session_stderr {
670                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
671            }
672            let mut h = stderr_handle.lock();
673            let _ = match &prefix_stderr {
674                Some(p) => writeln!(h, "[{p}] {line}"),
675                None => writeln!(h, "{line}"),
676            };
677        }
678    });
679
680    let status = child
681        .wait()
682        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
683
684    let _ = stdout_thread.join();
685    let _ = stderr_thread.join();
686
687    let captured = Arc::try_unwrap(buffer)
688        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
689        .into_inner()
690        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
691
692    Ok((status, captured))
693}
694
695/// Execute a single step by invoking `zag`, streaming its output live.
696///
697/// Returns the captured stdout from zag. The optional `model_override`
698/// is used during retries to escalate to a different model. The optional
699/// `prefix` tags streamed lines with the step name (used for parallel runs).
700#[allow(clippy::too_many_arguments)]
701fn execute_step(
702    step: &Step,
703    prompt: &str,
704    workflow_name: &str,
705    model_override: Option<&str>,
706    prefix: Option<&str>,
707    session: Option<&Arc<SessionWriter>>,
708    rendered_system_prompt: Option<&str>,
709    workflow_provider: Option<&str>,
710    workflow_model: Option<&str>,
711) -> Result<String, ZigError> {
712    let args = build_zag_args(
713        step,
714        prompt,
715        workflow_name,
716        model_override,
717        rendered_system_prompt,
718        workflow_provider,
719        workflow_model,
720    );
721    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
722
723    if !status.success() {
724        return Err(ZigError::Execution(format!(
725            "step '{}' failed (exit {})",
726            step.name, status,
727        )));
728    }
729
730    Ok(stdout)
731}
732
733/// Run a step with retry logic, returning its captured stdout on success.
734///
735/// Extracted so both sequential and parallel execution paths share the
736/// same retry / model-escalation behavior.
737#[allow(clippy::too_many_arguments)]
738fn run_step_attempts(
739    step: &Step,
740    prompt: &str,
741    workflow_name: &str,
742    prefix: Option<&str>,
743    session: Option<&Arc<SessionWriter>>,
744    rendered_system_prompt: Option<&str>,
745    workflow_provider: Option<&str>,
746    workflow_model: Option<&str>,
747) -> Result<String, ZigError> {
748    let mut attempts = 0;
749    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
750        step.max_retries.unwrap_or(1) + 1
751    } else {
752        1
753    };
754
755    loop {
756        attempts += 1;
757        let model_override = if attempts > 1 {
758            step.retry_model.as_deref()
759        } else {
760            None
761        };
762        match execute_step(
763            step,
764            prompt,
765            workflow_name,
766            model_override,
767            prefix,
768            session,
769            rendered_system_prompt,
770            workflow_provider,
771            workflow_model,
772        ) {
773            Ok(output) => return Ok(output),
774            Err(e) => {
775                if let Some(w) = session {
776                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
777                }
778                if attempts < max_attempts {
779                    eprintln!(
780                        "    retry {}/{} for step '{}'",
781                        attempts,
782                        max_attempts - 1,
783                        step.name
784                    );
785                    continue;
786                }
787                return Err(e);
788            }
789        }
790    }
791}
792
793/// Extract variable values from step output using `saves` selectors.
794///
795/// Selectors:
796/// - `"$"` — the entire output
797/// - `"$.field"` — a top-level JSON field
798/// - `"$.nested.field"` — a nested JSON field
799fn extract_saves(
800    output: &str,
801    saves: &HashMap<String, String>,
802) -> Result<HashMap<String, String>, ZigError> {
803    let mut extracted = HashMap::new();
804
805    for (var_name, selector) in saves {
806        let value = if selector == "$" {
807            output.trim().to_string()
808        } else if let Some(path) = selector.strip_prefix("$.") {
809            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
810                ZigError::Execution(format!(
811                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
812                ))
813            })?;
814            json_path_lookup(&json, path)
815        } else {
816            output.trim().to_string()
817        };
818
819        extracted.insert(var_name.clone(), value);
820    }
821
822    Ok(extracted)
823}
824
825/// Partition a tier of steps into sequential steps and race groups.
826///
827/// Steps without a `race_group` are returned as sequential. Steps sharing
828/// the same `race_group` value are grouped together for parallel execution.
829fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
830    let mut sequential = Vec::new();
831    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
832
833    for step in tier {
834        if let Some(group) = &step.race_group {
835            race_groups.entry(group.clone()).or_default().push(step);
836        } else {
837            sequential.push(*step);
838        }
839    }
840
841    (sequential, race_groups)
842}
843
844/// Spawn a step as a child process without waiting for it to finish.
845fn spawn_step(
846    step: &Step,
847    prompt: &str,
848    workflow_name: &str,
849    rendered_system_prompt: Option<&str>,
850    workflow_provider: Option<&str>,
851    workflow_model: Option<&str>,
852) -> Result<std::process::Child, ZigError> {
853    let args = build_zag_args(
854        step,
855        prompt,
856        workflow_name,
857        None,
858        rendered_system_prompt,
859        workflow_provider,
860        workflow_model,
861    );
862    let mut cmd = Command::new("zag");
863    cmd.args(&args)
864        .stdout(std::process::Stdio::piped())
865        .stderr(std::process::Stdio::piped());
866
867    cmd.spawn()
868        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
869}
870
871/// Execute a race group: run all steps in parallel, return the first winner.
872///
873/// When one step finishes successfully, all remaining steps are killed.
874/// Returns the winning step's name and its stdout output.
875#[allow(clippy::too_many_arguments)]
876fn execute_race_group(
877    steps: &[&Step],
878    prompts: &HashMap<String, String>,
879    system_prompts: &HashMap<String, String>,
880    workflow_name: &str,
881    tier_index: usize,
882    session: Option<&Arc<SessionWriter>>,
883    workflow_provider: Option<&str>,
884    workflow_model: Option<&str>,
885) -> Result<(String, String), ZigError> {
886    if let Some(w) = session {
887        for step in steps {
888            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
889            let preview = prompts
890                .get(&step.name)
891                .map(|p| prompt_preview(p))
892                .unwrap_or_default();
893            let _ = w.step_started(
894                &step.name,
895                tier_index,
896                &zag_session_id,
897                zag_command_name(&step.command),
898                step.model.as_deref(),
899                &preview,
900            );
901        }
902    }
903    let race_started = Instant::now();
904    let mut children: Vec<(String, std::process::Child)> = Vec::new();
905
906    for step in steps {
907        let prompt = prompts.get(&step.name).ok_or_else(|| {
908            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
909        })?;
910        eprintln!("  racing step '{}'...", step.name);
911        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
912        let child = spawn_step(
913            step,
914            prompt,
915            workflow_name,
916            rendered_sp,
917            workflow_provider,
918            workflow_model,
919        )?;
920        children.push((step.name.clone(), child));
921    }
922
923    // Poll until one finishes
924    loop {
925        for i in 0..children.len() {
926            let status = children[i]
927                .1
928                .try_wait()
929                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
930
931            if let Some(exit_status) = status {
932                let (winner_name, winner_child) = children.remove(i);
933
934                // Kill remaining children
935                for (name, mut child) in children {
936                    eprintln!("  cancelling step '{name}' (race lost)");
937                    let _ = child.kill();
938                    let _ = child.wait();
939                }
940
941                let elapsed = race_started.elapsed().as_millis() as u64;
942                if !exit_status.success() {
943                    let stderr = winner_child
944                        .stderr
945                        .map(|mut s| {
946                            let mut buf = String::new();
947                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
948                            buf
949                        })
950                        .unwrap_or_default();
951                    let err_msg = format!(
952                        "race winner '{}' failed (exit {}): {}",
953                        winner_name,
954                        exit_status,
955                        stderr.trim()
956                    );
957                    if let Some(w) = session {
958                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
959                    }
960                    return Err(ZigError::Execution(err_msg));
961                }
962
963                let stdout = winner_child
964                    .stdout
965                    .map(|mut s| {
966                        let mut buf = String::new();
967                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
968                        buf
969                    })
970                    .unwrap_or_default();
971
972                eprintln!("  race won by '{winner_name}'");
973                if let Some(w) = session {
974                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
975                }
976                return Ok((winner_name, stdout));
977            }
978        }
979
980        std::thread::sleep(Duration::from_millis(100));
981    }
982}
983
984/// Execute a single sequential step with retry logic, saves, and next-jump handling.
985#[allow(clippy::too_many_arguments)]
986fn execute_sequential_step(
987    step: &Step,
988    vars: &mut HashMap<String, String>,
989    user_prompt: Option<&str>,
990    step_outputs: &mut HashMap<String, String>,
991    workflow_name: &str,
992    pending_next: &mut Option<String>,
993    tier_index: usize,
994    session: Option<&Arc<SessionWriter>>,
995    roles: &HashMap<String, Role>,
996    resources: &ResourceCollector,
997    memory: &MemoryCollector,
998    workflow_dir: &Path,
999    workflow_provider: Option<&str>,
1000    workflow_model: Option<&str>,
1001) -> Result<(), ZigError> {
1002    if let Some(condition) = &step.condition {
1003        if !evaluate_condition(condition, vars)? {
1004            eprintln!(
1005                "  skipping '{}' (condition not met: {condition})",
1006                step.name
1007            );
1008            if let Some(w) = session {
1009                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1010            }
1011            return Ok(());
1012        }
1013    }
1014
1015    eprintln!("  running step '{}'...", step.name);
1016
1017    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1018    let rendered_sp = resolve_role_system_prompt(
1019        step,
1020        roles,
1021        resources,
1022        memory,
1023        vars,
1024        workflow_dir,
1025        workflow_name,
1026    )?;
1027    if let Some(w) = session {
1028        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1029        let _ = w.step_started(
1030            &step.name,
1031            tier_index,
1032            &zag_session_id,
1033            zag_command_name(&step.command),
1034            step.model.as_deref(),
1035            &prompt_preview(&prompt),
1036        );
1037    }
1038    let started = Instant::now();
1039    let result = run_step_attempts(
1040        step,
1041        &prompt,
1042        workflow_name,
1043        None,
1044        session,
1045        rendered_sp.as_deref(),
1046        workflow_provider,
1047        workflow_model,
1048    );
1049
1050    match result {
1051        Ok(output) => {
1052            let mut saved_keys: Vec<String> = Vec::new();
1053            if !step.saves.is_empty() {
1054                let saved = extract_saves(&output, &step.saves)?;
1055                for (k, v) in &saved {
1056                    eprintln!("    saved {k} = {v}");
1057                    saved_keys.push(k.clone());
1058                }
1059                vars.extend(saved);
1060            }
1061
1062            step_outputs.insert(step.name.clone(), output);
1063            eprintln!("  completed '{}'", step.name);
1064            if let Some(w) = session {
1065                let _ = w.step_completed(
1066                    &step.name,
1067                    0,
1068                    started.elapsed().as_millis() as u64,
1069                    saved_keys,
1070                );
1071            }
1072
1073            if step.next.is_some() {
1074                *pending_next = step.next.clone();
1075            }
1076        }
1077        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1078            FailurePolicy::Fail => return Err(e),
1079            FailurePolicy::Continue => {
1080                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1081            }
1082            FailurePolicy::Retry => {
1083                return Err(e);
1084            }
1085        },
1086    }
1087
1088    Ok(())
1089}
1090
1091/// Run multiple independent steps in a tier concurrently.
1092///
1093/// All non-skipped steps are spawned in their own threads and we wait for
1094/// every one to finish (unlike race groups, which kill losers). Output
1095/// lines from each step are streamed live to stderr, prefixed with the
1096/// step name to disambiguate. After completion, results are processed in
1097/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1098/// remain deterministic.
1099#[allow(clippy::too_many_arguments)]
1100fn execute_parallel_tier(
1101    steps: &[&Step],
1102    vars: &mut HashMap<String, String>,
1103    user_prompt: Option<&str>,
1104    step_outputs: &mut HashMap<String, String>,
1105    workflow_name: &str,
1106    pending_next: &mut Option<String>,
1107    tier_index: usize,
1108    session: Option<&Arc<SessionWriter>>,
1109    roles: &HashMap<String, Role>,
1110    resources: &ResourceCollector,
1111    memory: &MemoryCollector,
1112    workflow_dir: &Path,
1113    workflow_provider: Option<&str>,
1114    workflow_model: Option<&str>,
1115) -> Result<(), ZigError> {
1116    // Evaluate conditions and render prompts up front, so threads receive
1117    // the same variable snapshot they would have under sequential execution.
1118    let mut active: Vec<&Step> = Vec::new();
1119    let mut prompts: HashMap<String, String> = HashMap::new();
1120    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1121    for step in steps {
1122        if let Some(condition) = &step.condition {
1123            if !evaluate_condition(condition, vars)? {
1124                eprintln!(
1125                    "  skipping '{}' (condition not met: {condition})",
1126                    step.name
1127                );
1128                if let Some(w) = session {
1129                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1130                }
1131                continue;
1132            }
1133        }
1134        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1135        prompts.insert(step.name.clone(), prompt);
1136        if let Some(sp) = resolve_role_system_prompt(
1137            step,
1138            roles,
1139            resources,
1140            memory,
1141            vars,
1142            workflow_dir,
1143            workflow_name,
1144        )? {
1145            rendered_sps.insert(step.name.clone(), sp);
1146        }
1147        active.push(*step);
1148    }
1149
1150    if active.is_empty() {
1151        return Ok(());
1152    }
1153
1154    eprintln!("  running {} steps in parallel...", active.len());
1155
1156    let mut start_times: HashMap<String, Instant> = HashMap::new();
1157    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1158    for step in &active {
1159        let step_clone: Step = (*step).clone();
1160        let prompt = prompts.remove(&step.name).unwrap_or_default();
1161        let rendered_sp = rendered_sps.remove(&step.name);
1162        let workflow_name_owned = workflow_name.to_string();
1163        let name = step.name.clone();
1164        eprintln!("  starting '{name}'...");
1165        if let Some(w) = session {
1166            let zag_session_id = format!("zig-{workflow_name}-{name}");
1167            let _ = w.step_started(
1168                &name,
1169                tier_index,
1170                &zag_session_id,
1171                zag_command_name(&step.command),
1172                step.model.as_deref(),
1173                &prompt_preview(&prompt),
1174            );
1175        }
1176        start_times.insert(name.clone(), Instant::now());
1177        let session_clone = session.cloned();
1178        let wf_provider = workflow_provider.map(String::from);
1179        let wf_model = workflow_model.map(String::from);
1180        let handle = thread::spawn(move || {
1181            let res = run_step_attempts(
1182                &step_clone,
1183                &prompt,
1184                &workflow_name_owned,
1185                Some(&name),
1186                session_clone.as_ref(),
1187                rendered_sp.as_deref(),
1188                wf_provider.as_deref(),
1189                wf_model.as_deref(),
1190            );
1191            (name, res)
1192        });
1193        handles.push(handle);
1194    }
1195
1196    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1197    for handle in handles {
1198        match handle.join() {
1199            Ok((name, res)) => {
1200                results.insert(name, res);
1201            }
1202            Err(_) => {
1203                return Err(ZigError::Execution("parallel step thread panicked".into()));
1204            }
1205        }
1206    }
1207
1208    // Process results in tier-declaration order so `next` is deterministic.
1209    let mut errors: Vec<String> = Vec::new();
1210    for step in &active {
1211        let Some(res) = results.remove(&step.name) else {
1212            continue;
1213        };
1214        let elapsed = start_times
1215            .remove(&step.name)
1216            .map(|t| t.elapsed().as_millis() as u64)
1217            .unwrap_or(0);
1218        match res {
1219            Ok(output) => {
1220                let mut saved_keys: Vec<String> = Vec::new();
1221                if !step.saves.is_empty() {
1222                    let saved = extract_saves(&output, &step.saves)?;
1223                    for (k, v) in &saved {
1224                        eprintln!("    saved {k} = {v}");
1225                        saved_keys.push(k.clone());
1226                    }
1227                    vars.extend(saved);
1228                }
1229                step_outputs.insert(step.name.clone(), output);
1230                eprintln!("  completed '{}'", step.name);
1231                if let Some(w) = session {
1232                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1233                }
1234                if step.next.is_some() && pending_next.is_none() {
1235                    *pending_next = step.next.clone();
1236                }
1237            }
1238            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1239                FailurePolicy::Continue => {
1240                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1241                }
1242                FailurePolicy::Fail | FailurePolicy::Retry => {
1243                    errors.push(format!("'{}': {e}", step.name));
1244                }
1245            },
1246        }
1247    }
1248
1249    if !errors.is_empty() {
1250        return Err(ZigError::Execution(format!(
1251            "parallel step(s) failed: {}",
1252            errors.join("; ")
1253        )));
1254    }
1255
1256    Ok(())
1257}
1258
1259/// Initialize the variable map from workflow variable definitions.
1260/// Variables with defaults are set to their default value; others are empty.
1261fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1262    let mut vars = HashMap::new();
1263    for (name, var) in &workflow.vars {
1264        let value = match &var.default {
1265            Some(toml::Value::String(s)) => s.clone(),
1266            Some(toml::Value::Integer(n)) => n.to_string(),
1267            Some(toml::Value::Float(f)) => f.to_string(),
1268            Some(toml::Value::Boolean(b)) => b.to_string(),
1269            Some(other) => other.to_string(),
1270            None => String::new(),
1271        };
1272        vars.insert(name.clone(), value);
1273    }
1274    vars
1275}
1276
1277/// Main execution loop for a validated workflow.
1278fn execute(
1279    workflow: &Workflow,
1280    workflow_path: &std::path::Path,
1281    user_prompt: Option<&str>,
1282    workflow_dir: &Path,
1283    disable_resources: bool,
1284    disable_memory: bool,
1285) -> Result<(), ZigError> {
1286    let mut vars = init_vars(workflow);
1287
1288    let resource_collector = ResourceCollector::from_env(
1289        &workflow.workflow.name,
1290        &workflow.workflow.resources,
1291        workflow_dir,
1292        disable_resources,
1293    );
1294
1295    let config = ZigConfig::load();
1296    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1297    let memory_collector = MemoryCollector::from_env(
1298        &workflow.workflow.name,
1299        workflow_memory_mode,
1300        &config,
1301        disable_memory,
1302    );
1303
1304    // Load file-backed variable defaults before prompt binding.
1305    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1306
1307    // Bind user prompt to the variable with `from = "prompt"`, if any.
1308    let prompt_var = workflow
1309        .vars
1310        .iter()
1311        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1312        .map(|(name, _)| name.clone());
1313
1314    if let Some(ref var_name) = prompt_var {
1315        if let Some(prompt) = user_prompt {
1316            vars.insert(var_name.clone(), prompt.to_string());
1317        }
1318    }
1319
1320    // Validate variable values against constraints before executing.
1321    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1322        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1323        return Err(ZigError::Validation(msgs.join("; ")));
1324    }
1325
1326    // When prompt is bound to a variable, don't also prepend "User context:".
1327    let effective_user_prompt = if prompt_var.is_some() {
1328        None
1329    } else {
1330        user_prompt
1331    };
1332
1333    let mut step_outputs: HashMap<String, String> = HashMap::new();
1334
1335    let wf_provider = workflow.workflow.provider.as_deref();
1336    let wf_model = workflow.workflow.model.as_deref();
1337
1338    let tiers = topological_sort(&workflow.steps)?;
1339
1340    eprintln!(
1341        "running workflow '{}' ({} steps in {} tiers)",
1342        workflow.workflow.name,
1343        workflow.steps.len(),
1344        tiers.len()
1345    );
1346
1347    // Open a zig session log for this run. Failure to open the log is
1348    // non-fatal — `zig run` should still execute even if `~/.zig` is
1349    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1350    // downstream so the writer-less path stays intact for tests.
1351    let coordinator = match SessionWriter::create(
1352        &workflow.workflow.name,
1353        &workflow_path.to_string_lossy(),
1354        user_prompt,
1355        tiers.len(),
1356    ) {
1357        Ok(writer) => {
1358            eprintln!("zig session: {}", writer.session_id());
1359            Some(SessionCoordinator::start(writer))
1360        }
1361        Err(e) => {
1362            eprintln!("warning: failed to open zig session log: {e}");
1363            None
1364        }
1365    };
1366    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1367    let session_ref = session_writer.as_ref();
1368
1369    let mut iteration = 0;
1370    let mut pending_next: Option<String> = None;
1371
1372    loop {
1373        let tiers_to_run = if let Some(ref next_step) = pending_next {
1374            // Re-run from the target step onward
1375            let remaining: Vec<Vec<&Step>> = tiers
1376                .iter()
1377                .map(|tier| {
1378                    tier.iter()
1379                        .filter(|s| s.name == *next_step)
1380                        .copied()
1381                        .collect::<Vec<_>>()
1382                })
1383                .filter(|tier| !tier.is_empty())
1384                .collect();
1385            pending_next = None;
1386            remaining
1387        } else if iteration == 0 {
1388            tiers.clone()
1389        } else {
1390            break;
1391        };
1392
1393        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1394            let (non_race, race_groups) = partition_tier(tier);
1395
1396            if let Some(w) = session_ref {
1397                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1398                let _ = w.tier_started(tier_index, names);
1399            }
1400
1401            // Independent steps in the same tier run concurrently. A single
1402            // step takes the sequential path so its output streams without
1403            // a name prefix; multiple steps go through the parallel path.
1404            if non_race.len() <= 1 {
1405                for step in &non_race {
1406                    execute_sequential_step(
1407                        step,
1408                        &mut vars,
1409                        effective_user_prompt,
1410                        &mut step_outputs,
1411                        &workflow.workflow.name,
1412                        &mut pending_next,
1413                        tier_index,
1414                        session_ref,
1415                        &workflow.roles,
1416                        &resource_collector,
1417                        &memory_collector,
1418                        workflow_dir,
1419                        wf_provider,
1420                        wf_model,
1421                    )?;
1422                }
1423            } else {
1424                execute_parallel_tier(
1425                    &non_race,
1426                    &mut vars,
1427                    effective_user_prompt,
1428                    &mut step_outputs,
1429                    &workflow.workflow.name,
1430                    &mut pending_next,
1431                    tier_index,
1432                    session_ref,
1433                    &workflow.roles,
1434                    &resource_collector,
1435                    &memory_collector,
1436                    workflow_dir,
1437                    wf_provider,
1438                    wf_model,
1439                )?;
1440            }
1441
1442            // Run race groups in parallel
1443            for (group_name, race_steps) in &race_groups {
1444                eprintln!("  starting race group '{group_name}'...");
1445
1446                // Build prompts for all racers (conditions evaluated here)
1447                let mut prompts = HashMap::new();
1448                let mut race_sps: HashMap<String, String> = HashMap::new();
1449                let mut active_steps: Vec<&Step> = Vec::new();
1450                for step in race_steps {
1451                    if let Some(condition) = &step.condition {
1452                        if !evaluate_condition(condition, &vars)? {
1453                            eprintln!(
1454                                "  skipping '{}' (condition not met: {condition})",
1455                                step.name
1456                            );
1457                            continue;
1458                        }
1459                    }
1460                    let prompt =
1461                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1462                    prompts.insert(step.name.clone(), prompt);
1463                    if let Some(sp) = resolve_role_system_prompt(
1464                        step,
1465                        &workflow.roles,
1466                        &resource_collector,
1467                        &memory_collector,
1468                        &vars,
1469                        workflow_dir,
1470                        &workflow.workflow.name,
1471                    )? {
1472                        race_sps.insert(step.name.clone(), sp);
1473                    }
1474                    active_steps.push(step);
1475                }
1476
1477                if active_steps.is_empty() {
1478                    continue;
1479                }
1480
1481                match execute_race_group(
1482                    &active_steps,
1483                    &prompts,
1484                    &race_sps,
1485                    &workflow.workflow.name,
1486                    tier_index,
1487                    session_ref,
1488                    wf_provider,
1489                    wf_model,
1490                ) {
1491                    Ok((winner_name, output)) => {
1492                        // Find the winning step to process saves/next
1493                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1494                            if !winner.saves.is_empty() {
1495                                let saved = extract_saves(&output, &winner.saves)?;
1496                                for (k, v) in &saved {
1497                                    eprintln!("    saved {k} = {v}");
1498                                }
1499                                vars.extend(saved);
1500                            }
1501                            if winner.next.is_some() {
1502                                pending_next = winner.next.clone();
1503                            }
1504                        }
1505                        step_outputs.insert(winner_name.clone(), output);
1506                        eprintln!(
1507                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1508                        );
1509                    }
1510                    Err(e) => return Err(e),
1511                }
1512            }
1513        }
1514
1515        iteration += 1;
1516        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1517            if iteration >= MAX_LOOP_ITERATIONS {
1518                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1519            }
1520            break;
1521        }
1522    }
1523
1524    eprintln!("workflow '{}' completed", workflow.workflow.name);
1525    if let Some(c) = coordinator {
1526        let _ = c.finish(SessionStatus::Success);
1527    }
1528    Ok(())
1529}
1530
1531/// Short label for the zag subcommand a step will invoke. Used in
1532/// `step_started` events so listeners can distinguish run/review/plan/etc.
1533fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1534    match cmd {
1535        None => "run",
1536        Some(StepCommand::Review) => "review",
1537        Some(StepCommand::Plan) => "plan",
1538        Some(StepCommand::Pipe) => "pipe",
1539        Some(StepCommand::Collect) => "collect",
1540        Some(StepCommand::Summary) => "summary",
1541    }
1542}
1543
1544/// Truncated single-line preview of a rendered prompt for the session log.
1545fn prompt_preview(prompt: &str) -> String {
1546    const MAX: usize = 200;
1547    let collapsed: String = prompt
1548        .chars()
1549        .map(|c| if c == '\n' { ' ' } else { c })
1550        .collect();
1551    if collapsed.chars().count() <= MAX {
1552        collapsed
1553    } else {
1554        let truncated: String = collapsed.chars().take(MAX).collect();
1555        format!("{truncated}…")
1556    }
1557}
1558
1559#[cfg(test)]
1560#[path = "run_tests.rs"]
1561mod tests;