Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::io::{BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::Command;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::Duration;
8
9use std::time::Instant;
10
11use crate::error::ZigError;
12use crate::resources::{ResourceCollector, render_system_block};
13use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
14use crate::workflow::model::{FailurePolicy, Role, Step, StepCommand, Workflow};
15use crate::workflow::{parser, validate};
16
17/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
18const MAX_LOOP_ITERATIONS: usize = 100;
19
20/// Execute a `.zug` workflow file.
21///
22/// Parses the workflow, validates it, resolves the step DAG, and executes
23/// each step by delegating to `zag`. The optional `user_prompt` is injected
24/// as additional context into every step's prompt.
25///
26/// Resource advertisement (the `<resources>` block prepended to each step's
27/// system prompt) is enabled by default; pass `disable_resources = true` to
28/// opt out, e.g. via `zig run --no-resources`.
29pub fn run_workflow(
30    workflow_path: &str,
31    user_prompt: Option<&str>,
32    disable_resources: bool,
33) -> Result<(), ZigError> {
34    check_zag()?;
35
36    let path = resolve_workflow_path(workflow_path)?;
37    let (workflow, source) = parser::parse_workflow(&path)?;
38
39    if let Err(errors) = validate::validate(&workflow) {
40        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
41        return Err(ZigError::Validation(msgs.join("; ")));
42    }
43
44    execute(
45        &workflow,
46        &path,
47        user_prompt,
48        source.dir(),
49        disable_resources,
50    )
51}
52
53/// Verify that `zag` is installed and available on PATH.
54pub(crate) fn check_zag() -> Result<(), ZigError> {
55    let zag_available = Command::new("zag")
56        .arg("--version")
57        .output()
58        .is_ok_and(|o| o.status.success());
59
60    if !zag_available {
61        return Err(ZigError::Zag(
62            "zag is not installed or not in PATH. Install it from https://github.com/niclaslindstedt/zag".into(),
63        ));
64    }
65    Ok(())
66}
67
68/// Resolve a workflow argument to an actual file path.
69///
70/// Tries in order:
71/// 1. Literal path as given
72/// 2. With `.zug` extension appended
73/// 3. Under `./workflows/` directory
74/// 4. Under `./workflows/` with `.zug` appended
75pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
76    let mut candidates = vec![
77        PathBuf::from(workflow),
78        PathBuf::from(format!("{workflow}.zug")),
79        PathBuf::from(format!("workflows/{workflow}")),
80        PathBuf::from(format!("workflows/{workflow}.zug")),
81    ];
82
83    if let Some(global_dir) = crate::paths::global_workflows_dir() {
84        candidates.push(global_dir.join(workflow));
85        candidates.push(global_dir.join(format!("{workflow}.zug")));
86    }
87
88    for candidate in &candidates {
89        if candidate.exists() {
90            return Ok(candidate.clone());
91        }
92    }
93
94    Err(ZigError::Io(format!(
95        "workflow not found: '{workflow}' (tried: {})",
96        candidates
97            .iter()
98            .map(|p| p.display().to_string())
99            .collect::<Vec<_>>()
100            .join(", ")
101    )))
102}
103
104/// Compute a topological ordering of steps grouped into tiers.
105///
106/// Each tier contains steps whose dependencies are all in earlier tiers,
107/// meaning steps within a tier can (in principle) run in parallel.
108/// Uses Kahn's algorithm.
109fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
110    let step_index: HashMap<&str, usize> = steps
111        .iter()
112        .enumerate()
113        .map(|(i, s)| (s.name.as_str(), i))
114        .collect();
115
116    let mut in_degree = vec![0usize; steps.len()];
117    for (i, step) in steps.iter().enumerate() {
118        for dep in &step.depends_on {
119            if step_index.contains_key(dep.as_str()) {
120                in_degree[i] += 1;
121            }
122        }
123    }
124
125    let mut tiers = Vec::new();
126    let mut remaining = in_degree.clone();
127    let mut completed: Vec<bool> = vec![false; steps.len()];
128
129    loop {
130        let tier: Vec<usize> = (0..steps.len())
131            .filter(|&i| !completed[i] && remaining[i] == 0)
132            .collect();
133
134        if tier.is_empty() {
135            break;
136        }
137
138        for &i in &tier {
139            completed[i] = true;
140        }
141
142        // Decrement in-degrees for dependents of this tier
143        for &i in &tier {
144            for (j, step) in steps.iter().enumerate() {
145                if !completed[j] && step.depends_on.contains(&steps[i].name) {
146                    remaining[j] -= 1;
147                }
148            }
149        }
150
151        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
152    }
153
154    let completed_count: usize = completed.iter().filter(|&&c| c).count();
155    if completed_count != steps.len() {
156        return Err(ZigError::Execution(
157            "could not resolve all steps — possible undetected cycle".into(),
158        ));
159    }
160
161    Ok(tiers)
162}
163
164/// Replace `${var_name}` references in a template with values from the variable map.
165///
166/// Supports dotted paths like `${result.score}` — the root variable name is
167/// looked up, and if its value is valid JSON, the nested path is traversed.
168/// Unknown variables are left as-is.
169fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
170    let mut result = String::with_capacity(template.len());
171    let mut rest = template;
172
173    while let Some(start) = rest.find("${") {
174        result.push_str(&rest[..start]);
175        let after_start = &rest[start + 2..];
176
177        if let Some(end) = after_start.find('}') {
178            let var_expr = &after_start[..end];
179            let mut parts = var_expr.splitn(2, '.');
180            let root = parts.next().unwrap_or(var_expr);
181
182            if let Some(value) = vars.get(root) {
183                if let Some(path) = parts.next() {
184                    // Try to navigate a JSON path
185                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
186                        let resolved = json_path_lookup(&json, path);
187                        result.push_str(&resolved);
188                    } else {
189                        result.push_str(value);
190                    }
191                } else {
192                    result.push_str(value);
193                }
194            } else {
195                // Unknown variable — leave as-is
196                result.push_str(&rest[start..start + 2 + end + 1]);
197            }
198
199            rest = &after_start[end + 1..];
200        } else {
201            result.push_str(&rest[start..]);
202            rest = "";
203        }
204    }
205
206    result.push_str(rest);
207    result
208}
209
210/// Look up a dotted path in a JSON value (e.g., "nested.field").
211fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
212    let mut current = value;
213    for key in path.split('.') {
214        match current.get(key) {
215            Some(v) => current = v,
216            None => return format!("${{?.{path}}}"),
217        }
218    }
219    match current {
220        serde_json::Value::String(s) => s.clone(),
221        other => other.to_string(),
222    }
223}
224
225/// Resolve the effective system prompt for a step, with any advertised
226/// resources prepended as a `<resources>` block.
227///
228/// Resolution order:
229/// 1. If `step.system_prompt` is set, use it (with variable substitution).
230/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
231///    look it up in the roles table, and use the role's system prompt
232///    (loaded from file if `system_prompt_file` is set).
233/// 3. Otherwise the base prompt is empty.
234///
235/// Resources from all configured tiers (global shared, global per-workflow,
236/// project cwd, inline workflow, inline step) are then collected by the
237/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
238/// that is prepended to the base prompt. If both the resources set and the
239/// base prompt are empty, returns `None` — keeping the current behavior when
240/// nothing is configured.
241fn resolve_role_system_prompt(
242    step: &Step,
243    roles: &HashMap<String, Role>,
244    resources: &ResourceCollector,
245    vars: &HashMap<String, String>,
246    workflow_dir: &Path,
247) -> Result<Option<String>, ZigError> {
248    // Resolve the base system prompt (may be empty if neither is set).
249    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
250        Some(substitute_vars(sp, vars))
251    } else if let Some(ref role_ref) = step.role {
252        let resolved_name = substitute_vars(role_ref, vars);
253        let role = roles.get(&resolved_name).ok_or_else(|| {
254            ZigError::Execution(format!(
255                "step '{}' references role '{}' which does not exist",
256                step.name, resolved_name
257            ))
258        })?;
259
260        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
261            let full_path = workflow_dir.join(file_path);
262            Some(std::fs::read_to_string(&full_path).map_err(|e| {
263                ZigError::Execution(format!(
264                    "failed to read system_prompt_file '{}' for role '{}': {e}",
265                    full_path.display(),
266                    resolved_name
267                ))
268            })?)
269        } else {
270            role.system_prompt.clone()
271        };
272
273        raw_prompt.map(|p| substitute_vars(&p, vars))
274    } else {
275        None
276    };
277
278    // Collect and render resources.
279    let set = resources.collect_for_step(&step.resources)?;
280    let resource_block = render_system_block(&set);
281
282    match (resource_block.is_empty(), base_prompt) {
283        (true, None) => Ok(None),
284        (true, Some(p)) => Ok(Some(p)),
285        (false, None) => Ok(Some(resource_block.trim_end().to_string())),
286        (false, Some(p)) => Ok(Some(format!("{resource_block}{p}"))),
287    }
288}
289
290/// Load file-backed default values for variables.
291///
292/// For each variable with `default_file` set (and no `default`), reads the
293/// file contents relative to `workflow_dir` and inserts them into the vars map.
294fn load_file_defaults(
295    vars: &mut HashMap<String, String>,
296    declarations: &HashMap<String, crate::workflow::model::Variable>,
297    workflow_dir: &Path,
298) -> Result<(), ZigError> {
299    for (name, decl) in declarations {
300        if decl.default.is_none() {
301            if let Some(ref file_path) = decl.default_file {
302                let full_path = workflow_dir.join(file_path);
303                let content = std::fs::read_to_string(&full_path).map_err(|e| {
304                    ZigError::Execution(format!(
305                        "failed to read default_file '{}' for variable '{name}': {e}",
306                        full_path.display()
307                    ))
308                })?;
309                vars.insert(name.clone(), content);
310            }
311        }
312    }
313    Ok(())
314}
315
316/// Evaluate a simple condition expression against the current variable state.
317///
318/// Supports:
319/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
320/// - String equality: `status == "done"`, `status != "pending"`
321/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
322fn evaluate_condition(condition: &str, vars: &HashMap<String, String>) -> Result<bool, ZigError> {
323    let condition = condition.trim();
324
325    // Try comparison operators (ordered by length to match `<=` before `<`)
326    let operators = ["<=", ">=", "!=", "==", "<", ">"];
327    for op in &operators {
328        if let Some(pos) = condition.find(op) {
329            let lhs = resolve_operand(condition[..pos].trim(), vars);
330            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
331            return Ok(compare(&lhs, &rhs, op));
332        }
333    }
334
335    // Truthy check: single variable name
336    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
337    Ok(is_truthy(value))
338}
339
340/// Resolve a condition operand to its string value.
341/// - String literals ("done") → done
342/// - Variable names → looked up in vars
343/// - Numeric literals → left as-is
344fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
345    // Strip surrounding quotes for string literals
346    if (token.starts_with('"') && token.ends_with('"'))
347        || (token.starts_with('\'') && token.ends_with('\''))
348    {
349        return token[1..token.len() - 1].to_string();
350    }
351    // Try variable lookup
352    if let Some(val) = vars.get(token) {
353        return val.clone();
354    }
355    // Return as-is (numeric literal or unknown)
356    token.to_string()
357}
358
359/// Compare two string operands with the given operator.
360/// Attempts numeric comparison first, falls back to lexicographic.
361fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
362    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
363        return match op {
364            "==" => (l - r).abs() < f64::EPSILON,
365            "!=" => (l - r).abs() >= f64::EPSILON,
366            "<" => l < r,
367            ">" => l > r,
368            "<=" => l <= r,
369            ">=" => l >= r,
370            _ => false,
371        };
372    }
373    match op {
374        "==" => lhs == rhs,
375        "!=" => lhs != rhs,
376        "<" => lhs < rhs,
377        ">" => lhs > rhs,
378        "<=" => lhs <= rhs,
379        ">=" => lhs >= rhs,
380        _ => false,
381    }
382}
383
384/// Check if a string value is truthy.
385fn is_truthy(value: &str) -> bool {
386    !value.is_empty() && value != "false" && value != "0"
387}
388
389/// Build the final prompt for a step, incorporating variable substitution,
390/// dependency outputs, and the user's context prompt.
391fn render_step_prompt(
392    step: &Step,
393    vars: &HashMap<String, String>,
394    user_prompt: Option<&str>,
395    dependency_outputs: &HashMap<String, String>,
396) -> String {
397    let mut prompt = String::new();
398
399    // Prepend user context if provided
400    if let Some(ctx) = user_prompt {
401        prompt.push_str(&format!("User context: {ctx}\n\n"));
402    }
403
404    // Inject dependency outputs if requested
405    if step.inject_context {
406        for dep in &step.depends_on {
407            if let Some(output) = dependency_outputs.get(dep) {
408                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
409            }
410        }
411    }
412
413    // Append the step's prompt with variable substitution
414    prompt.push_str(&substitute_vars(&step.prompt, vars));
415
416    prompt
417}
418
419/// Build the argument list for a zag invocation.
420///
421/// Separated from `execute_step` to allow unit testing of flag logic
422/// without a real `zag` binary. The optional `model_override` is used
423/// during retries when `retry_model` escalates to a different model.
424///
425/// The subcommand is determined by `step.command`:
426/// - `None` → `zag run <prompt>` (default)
427/// - `Review` → `zag review [prompt] [--uncommitted] [--base] [--commit] [--title]`
428/// - `Plan` → `zag plan <prompt> [-o path] [--instructions]`
429/// - `Pipe` → `zag pipe <session-ids...> -- <prompt>`
430/// - `Collect` → `zag collect <session-ids...>`
431/// - `Summary` → `zag summary <session-ids...>`
432fn build_zag_args(
433    step: &Step,
434    prompt: &str,
435    workflow_name: &str,
436    model_override: Option<&str>,
437    rendered_system_prompt: Option<&str>,
438    workflow_provider: Option<&str>,
439    workflow_model: Option<&str>,
440) -> Vec<String> {
441    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
442
443    // Build command-specific prefix and determine if agent args apply
444    let (mut args, accepts_agent_args) = match &step.command {
445        None => (vec!["run".to_string(), prompt.to_string()], true),
446        Some(StepCommand::Review) => {
447            let mut a = vec!["review".to_string()];
448            if !prompt.is_empty() {
449                a.push(prompt.to_string());
450            }
451            if step.uncommitted {
452                a.push("--uncommitted".into());
453            }
454            if let Some(base) = &step.base {
455                a.extend(["--base".into(), base.clone()]);
456            }
457            if let Some(commit) = &step.commit {
458                a.extend(["--commit".into(), commit.clone()]);
459            }
460            if let Some(title) = &step.title {
461                a.extend(["--title".into(), title.clone()]);
462            }
463            (a, true)
464        }
465        Some(StepCommand::Plan) => {
466            let mut a = vec!["plan".to_string(), prompt.to_string()];
467            if let Some(output) = &step.plan_output {
468                a.extend(["-o".into(), output.clone()]);
469            }
470            if let Some(instructions) = &step.instructions {
471                a.extend(["--instructions".into(), instructions.clone()]);
472            }
473            (a, true)
474        }
475        Some(StepCommand::Pipe) => {
476            let mut a = vec!["pipe".to_string()];
477            for dep in &step.depends_on {
478                a.push(session_name(dep));
479            }
480            a.push("--".into());
481            a.push(prompt.to_string());
482            (a, true)
483        }
484        Some(StepCommand::Collect) => {
485            let mut a = vec!["collect".to_string()];
486            for dep in &step.depends_on {
487                a.push(session_name(dep));
488            }
489            (a, false)
490        }
491        Some(StepCommand::Summary) => {
492            let mut a = vec!["summary".to_string()];
493            for dep in &step.depends_on {
494                a.push(session_name(dep));
495            }
496            (a, false)
497        }
498    };
499
500    // Agent args (provider, model, prompts, output, etc.) only apply to
501    // commands that launch an agent: run, review, plan, pipe.
502    if accepts_agent_args {
503        let effective_provider = step.provider.as_deref().or(workflow_provider);
504        if let Some(provider) = effective_provider {
505            args.extend(["--provider".into(), provider.to_string()]);
506        }
507
508        let effective_model = model_override.or(step.model.as_deref()).or(workflow_model);
509        if let Some(model) = effective_model {
510            args.extend(["--model".into(), model.to_string()]);
511        }
512
513        if let Some(sp) = rendered_system_prompt {
514            args.extend(["--system-prompt".into(), sp.to_string()]);
515        }
516        if let Some(max_turns) = step.max_turns {
517            args.extend(["--max-turns".into(), max_turns.to_string()]);
518        }
519
520        // Output format: explicit format overrides the json bool
521        if let Some(output) = &step.output {
522            args.extend(["-o".into(), output.clone()]);
523        } else if step.json {
524            args.push("--json".into());
525        }
526        if let Some(schema) = &step.json_schema {
527            args.extend(["--json-schema".into(), schema.clone()]);
528        }
529
530        if let Some(mcp_config) = &step.mcp_config {
531            args.extend(["--mcp-config".into(), mcp_config.clone()]);
532        }
533
534        // Execution environment
535        if step.auto_approve {
536            args.push("--auto-approve".into());
537        }
538        if let Some(root) = &step.root {
539            args.extend(["--root".into(), root.clone()]);
540        }
541        for dir in &step.add_dirs {
542            args.extend(["--add-dir".into(), dir.clone()]);
543        }
544        for (key, value) in &step.env {
545            args.extend(["--env".into(), format!("{key}={value}")]);
546        }
547        for file in &step.files {
548            args.extend(["--file".into(), file.clone()]);
549        }
550
551        // Context injection
552        for ctx in &step.context {
553            args.extend(["--context".into(), ctx.clone()]);
554        }
555        if let Some(plan) = &step.plan {
556            args.extend(["--plan".into(), plan.clone()]);
557        }
558
559        // Isolation
560        if step.worktree {
561            args.push("--worktree".into());
562        }
563        if let Some(sandbox) = &step.sandbox {
564            args.extend(["--sandbox".into(), sandbox.clone()]);
565        }
566    }
567
568    // Session metadata applies to all commands
569    let name = session_name(&step.name);
570    args.extend(["--name".into(), name]);
571
572    if !step.description.is_empty() {
573        args.extend(["--description".into(), step.description.clone()]);
574    }
575
576    args.extend(["--tag".into(), "zig-workflow".into()]);
577    for tag in &step.tags {
578        args.extend(["--tag".into(), tag.clone()]);
579    }
580
581    if let Some(timeout) = &step.timeout {
582        args.extend(["--timeout".into(), timeout.clone()]);
583    }
584
585    args
586}
587
588/// Spawn `zag` and stream its stdout/stderr live to our stderr while
589/// also accumulating stdout into a buffer for `saves` extraction.
590///
591/// If `prefix` is `Some`, every emitted line is prefixed with `[prefix] `
592/// — used to disambiguate output from steps running in parallel.
593fn run_zag_streaming(
594    args: &[String],
595    step_name: &str,
596    prefix: Option<&str>,
597    session: Option<&Arc<SessionWriter>>,
598) -> Result<(std::process::ExitStatus, String), ZigError> {
599    let mut cmd = Command::new("zag");
600    cmd.args(args)
601        .stdout(std::process::Stdio::piped())
602        .stderr(std::process::Stdio::piped());
603
604    let mut child = cmd
605        .spawn()
606        .map_err(|e| ZigError::Zag(format!("failed to launch zag for step '{step_name}': {e}")))?;
607
608    let stdout = child.stdout.take().expect("stdout was piped");
609    let stderr = child.stderr.take().expect("stderr was piped");
610
611    let buffer = Arc::new(Mutex::new(String::new()));
612    let buffer_clone = Arc::clone(&buffer);
613    let prefix_stdout = prefix.map(String::from);
614    let prefix_stderr = prefix.map(String::from);
615    let session_stdout = session.cloned();
616    let session_stderr = session.cloned();
617    let step_name_stdout = step_name.to_string();
618    let step_name_stderr = step_name.to_string();
619
620    let stdout_thread = thread::spawn(move || {
621        let reader = BufReader::new(stdout);
622        let stderr_handle = std::io::stderr();
623        for line in reader.lines().map_while(Result::ok) {
624            if let Ok(mut buf) = buffer_clone.lock() {
625                buf.push_str(&line);
626                buf.push('\n');
627            }
628            if let Some(w) = &session_stdout {
629                let _ = w.step_output(&step_name_stdout, OutputStream::Stdout, &line);
630            }
631            let mut h = stderr_handle.lock();
632            let _ = match &prefix_stdout {
633                Some(p) => writeln!(h, "[{p}] {line}"),
634                None => writeln!(h, "{line}"),
635            };
636        }
637    });
638
639    let stderr_thread = thread::spawn(move || {
640        let reader = BufReader::new(stderr);
641        let stderr_handle = std::io::stderr();
642        for line in reader.lines().map_while(Result::ok) {
643            if let Some(w) = &session_stderr {
644                let _ = w.step_output(&step_name_stderr, OutputStream::Stderr, &line);
645            }
646            let mut h = stderr_handle.lock();
647            let _ = match &prefix_stderr {
648                Some(p) => writeln!(h, "[{p}] {line}"),
649                None => writeln!(h, "{line}"),
650            };
651        }
652    });
653
654    let status = child
655        .wait()
656        .map_err(|e| ZigError::Execution(format!("failed to wait for child: {e}")))?;
657
658    let _ = stdout_thread.join();
659    let _ = stderr_thread.join();
660
661    let captured = Arc::try_unwrap(buffer)
662        .map_err(|_| ZigError::Execution("buffer still shared after threads joined".into()))?
663        .into_inner()
664        .map_err(|_| ZigError::Execution("output buffer poisoned".into()))?;
665
666    Ok((status, captured))
667}
668
669/// Execute a single step by invoking `zag`, streaming its output live.
670///
671/// Returns the captured stdout from zag. The optional `model_override`
672/// is used during retries to escalate to a different model. The optional
673/// `prefix` tags streamed lines with the step name (used for parallel runs).
674#[allow(clippy::too_many_arguments)]
675fn execute_step(
676    step: &Step,
677    prompt: &str,
678    workflow_name: &str,
679    model_override: Option<&str>,
680    prefix: Option<&str>,
681    session: Option<&Arc<SessionWriter>>,
682    rendered_system_prompt: Option<&str>,
683    workflow_provider: Option<&str>,
684    workflow_model: Option<&str>,
685) -> Result<String, ZigError> {
686    let args = build_zag_args(
687        step,
688        prompt,
689        workflow_name,
690        model_override,
691        rendered_system_prompt,
692        workflow_provider,
693        workflow_model,
694    );
695    let (status, stdout) = run_zag_streaming(&args, &step.name, prefix, session)?;
696
697    if !status.success() {
698        return Err(ZigError::Execution(format!(
699            "step '{}' failed (exit {})",
700            step.name, status,
701        )));
702    }
703
704    Ok(stdout)
705}
706
707/// Run a step with retry logic, returning its captured stdout on success.
708///
709/// Extracted so both sequential and parallel execution paths share the
710/// same retry / model-escalation behavior.
711#[allow(clippy::too_many_arguments)]
712fn run_step_attempts(
713    step: &Step,
714    prompt: &str,
715    workflow_name: &str,
716    prefix: Option<&str>,
717    session: Option<&Arc<SessionWriter>>,
718    rendered_system_prompt: Option<&str>,
719    workflow_provider: Option<&str>,
720    workflow_model: Option<&str>,
721) -> Result<String, ZigError> {
722    let mut attempts = 0;
723    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
724        step.max_retries.unwrap_or(1) + 1
725    } else {
726        1
727    };
728
729    loop {
730        attempts += 1;
731        let model_override = if attempts > 1 {
732            step.retry_model.as_deref()
733        } else {
734            None
735        };
736        match execute_step(
737            step,
738            prompt,
739            workflow_name,
740            model_override,
741            prefix,
742            session,
743            rendered_system_prompt,
744            workflow_provider,
745            workflow_model,
746        ) {
747            Ok(output) => return Ok(output),
748            Err(e) => {
749                if let Some(w) = session {
750                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
751                }
752                if attempts < max_attempts {
753                    eprintln!(
754                        "    retry {}/{} for step '{}'",
755                        attempts,
756                        max_attempts - 1,
757                        step.name
758                    );
759                    continue;
760                }
761                return Err(e);
762            }
763        }
764    }
765}
766
767/// Extract variable values from step output using `saves` selectors.
768///
769/// Selectors:
770/// - `"$"` — the entire output
771/// - `"$.field"` — a top-level JSON field
772/// - `"$.nested.field"` — a nested JSON field
773fn extract_saves(
774    output: &str,
775    saves: &HashMap<String, String>,
776) -> Result<HashMap<String, String>, ZigError> {
777    let mut extracted = HashMap::new();
778
779    for (var_name, selector) in saves {
780        let value = if selector == "$" {
781            output.trim().to_string()
782        } else if let Some(path) = selector.strip_prefix("$.") {
783            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
784                ZigError::Execution(format!(
785                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
786                ))
787            })?;
788            json_path_lookup(&json, path)
789        } else {
790            output.trim().to_string()
791        };
792
793        extracted.insert(var_name.clone(), value);
794    }
795
796    Ok(extracted)
797}
798
799/// Partition a tier of steps into sequential steps and race groups.
800///
801/// Steps without a `race_group` are returned as sequential. Steps sharing
802/// the same `race_group` value are grouped together for parallel execution.
803fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
804    let mut sequential = Vec::new();
805    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
806
807    for step in tier {
808        if let Some(group) = &step.race_group {
809            race_groups.entry(group.clone()).or_default().push(step);
810        } else {
811            sequential.push(*step);
812        }
813    }
814
815    (sequential, race_groups)
816}
817
818/// Spawn a step as a child process without waiting for it to finish.
819fn spawn_step(
820    step: &Step,
821    prompt: &str,
822    workflow_name: &str,
823    rendered_system_prompt: Option<&str>,
824    workflow_provider: Option<&str>,
825    workflow_model: Option<&str>,
826) -> Result<std::process::Child, ZigError> {
827    let args = build_zag_args(
828        step,
829        prompt,
830        workflow_name,
831        None,
832        rendered_system_prompt,
833        workflow_provider,
834        workflow_model,
835    );
836    let mut cmd = Command::new("zag");
837    cmd.args(&args)
838        .stdout(std::process::Stdio::piped())
839        .stderr(std::process::Stdio::piped());
840
841    cmd.spawn()
842        .map_err(|e| ZigError::Zag(format!("failed to spawn zag for step '{}': {e}", step.name)))
843}
844
845/// Execute a race group: run all steps in parallel, return the first winner.
846///
847/// When one step finishes successfully, all remaining steps are killed.
848/// Returns the winning step's name and its stdout output.
849#[allow(clippy::too_many_arguments)]
850fn execute_race_group(
851    steps: &[&Step],
852    prompts: &HashMap<String, String>,
853    system_prompts: &HashMap<String, String>,
854    workflow_name: &str,
855    tier_index: usize,
856    session: Option<&Arc<SessionWriter>>,
857    workflow_provider: Option<&str>,
858    workflow_model: Option<&str>,
859) -> Result<(String, String), ZigError> {
860    if let Some(w) = session {
861        for step in steps {
862            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
863            let preview = prompts
864                .get(&step.name)
865                .map(|p| prompt_preview(p))
866                .unwrap_or_default();
867            let _ = w.step_started(
868                &step.name,
869                tier_index,
870                &zag_session_id,
871                zag_command_name(&step.command),
872                step.model.as_deref(),
873                &preview,
874            );
875        }
876    }
877    let race_started = Instant::now();
878    let mut children: Vec<(String, std::process::Child)> = Vec::new();
879
880    for step in steps {
881        let prompt = prompts.get(&step.name).ok_or_else(|| {
882            ZigError::Execution(format!("missing prompt for step '{}'", step.name))
883        })?;
884        eprintln!("  racing step '{}'...", step.name);
885        let rendered_sp = system_prompts.get(&step.name).map(|s| s.as_str());
886        let child = spawn_step(
887            step,
888            prompt,
889            workflow_name,
890            rendered_sp,
891            workflow_provider,
892            workflow_model,
893        )?;
894        children.push((step.name.clone(), child));
895    }
896
897    // Poll until one finishes
898    loop {
899        for i in 0..children.len() {
900            let status = children[i]
901                .1
902                .try_wait()
903                .map_err(|e| ZigError::Execution(format!("failed to poll child: {e}")))?;
904
905            if let Some(exit_status) = status {
906                let (winner_name, winner_child) = children.remove(i);
907
908                // Kill remaining children
909                for (name, mut child) in children {
910                    eprintln!("  cancelling step '{name}' (race lost)");
911                    let _ = child.kill();
912                    let _ = child.wait();
913                }
914
915                let elapsed = race_started.elapsed().as_millis() as u64;
916                if !exit_status.success() {
917                    let stderr = winner_child
918                        .stderr
919                        .map(|mut s| {
920                            let mut buf = String::new();
921                            std::io::Read::read_to_string(&mut s, &mut buf).ok();
922                            buf
923                        })
924                        .unwrap_or_default();
925                    let err_msg = format!(
926                        "race winner '{}' failed (exit {}): {}",
927                        winner_name,
928                        exit_status,
929                        stderr.trim()
930                    );
931                    if let Some(w) = session {
932                        let _ = w.step_failed(&winner_name, exit_status.code(), 1, &err_msg);
933                    }
934                    return Err(ZigError::Execution(err_msg));
935                }
936
937                let stdout = winner_child
938                    .stdout
939                    .map(|mut s| {
940                        let mut buf = String::new();
941                        std::io::Read::read_to_string(&mut s, &mut buf).ok();
942                        buf
943                    })
944                    .unwrap_or_default();
945
946                eprintln!("  race won by '{winner_name}'");
947                if let Some(w) = session {
948                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
949                }
950                return Ok((winner_name, stdout));
951            }
952        }
953
954        std::thread::sleep(Duration::from_millis(100));
955    }
956}
957
958/// Execute a single sequential step with retry logic, saves, and next-jump handling.
959#[allow(clippy::too_many_arguments)]
960fn execute_sequential_step(
961    step: &Step,
962    vars: &mut HashMap<String, String>,
963    user_prompt: Option<&str>,
964    step_outputs: &mut HashMap<String, String>,
965    workflow_name: &str,
966    pending_next: &mut Option<String>,
967    tier_index: usize,
968    session: Option<&Arc<SessionWriter>>,
969    roles: &HashMap<String, Role>,
970    resources: &ResourceCollector,
971    workflow_dir: &Path,
972    workflow_provider: Option<&str>,
973    workflow_model: Option<&str>,
974) -> Result<(), ZigError> {
975    if let Some(condition) = &step.condition {
976        if !evaluate_condition(condition, vars)? {
977            eprintln!(
978                "  skipping '{}' (condition not met: {condition})",
979                step.name
980            );
981            if let Some(w) = session {
982                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
983            }
984            return Ok(());
985        }
986    }
987
988    eprintln!("  running step '{}'...", step.name);
989
990    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
991    let rendered_sp = resolve_role_system_prompt(step, roles, resources, vars, workflow_dir)?;
992    if let Some(w) = session {
993        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
994        let _ = w.step_started(
995            &step.name,
996            tier_index,
997            &zag_session_id,
998            zag_command_name(&step.command),
999            step.model.as_deref(),
1000            &prompt_preview(&prompt),
1001        );
1002    }
1003    let started = Instant::now();
1004    let result = run_step_attempts(
1005        step,
1006        &prompt,
1007        workflow_name,
1008        None,
1009        session,
1010        rendered_sp.as_deref(),
1011        workflow_provider,
1012        workflow_model,
1013    );
1014
1015    match result {
1016        Ok(output) => {
1017            let mut saved_keys: Vec<String> = Vec::new();
1018            if !step.saves.is_empty() {
1019                let saved = extract_saves(&output, &step.saves)?;
1020                for (k, v) in &saved {
1021                    eprintln!("    saved {k} = {v}");
1022                    saved_keys.push(k.clone());
1023                }
1024                vars.extend(saved);
1025            }
1026
1027            step_outputs.insert(step.name.clone(), output);
1028            eprintln!("  completed '{}'", step.name);
1029            if let Some(w) = session {
1030                let _ = w.step_completed(
1031                    &step.name,
1032                    0,
1033                    started.elapsed().as_millis() as u64,
1034                    saved_keys,
1035                );
1036            }
1037
1038            if step.next.is_some() {
1039                *pending_next = step.next.clone();
1040            }
1041        }
1042        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1043            FailurePolicy::Fail => return Err(e),
1044            FailurePolicy::Continue => {
1045                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1046            }
1047            FailurePolicy::Retry => {
1048                return Err(e);
1049            }
1050        },
1051    }
1052
1053    Ok(())
1054}
1055
1056/// Run multiple independent steps in a tier concurrently.
1057///
1058/// All non-skipped steps are spawned in their own threads and we wait for
1059/// every one to finish (unlike race groups, which kill losers). Output
1060/// lines from each step are streamed live to stderr, prefixed with the
1061/// step name to disambiguate. After completion, results are processed in
1062/// tier-declaration order so `saves`, `next`, and `on_failure` semantics
1063/// remain deterministic.
1064#[allow(clippy::too_many_arguments)]
1065fn execute_parallel_tier(
1066    steps: &[&Step],
1067    vars: &mut HashMap<String, String>,
1068    user_prompt: Option<&str>,
1069    step_outputs: &mut HashMap<String, String>,
1070    workflow_name: &str,
1071    pending_next: &mut Option<String>,
1072    tier_index: usize,
1073    session: Option<&Arc<SessionWriter>>,
1074    roles: &HashMap<String, Role>,
1075    resources: &ResourceCollector,
1076    workflow_dir: &Path,
1077    workflow_provider: Option<&str>,
1078    workflow_model: Option<&str>,
1079) -> Result<(), ZigError> {
1080    // Evaluate conditions and render prompts up front, so threads receive
1081    // the same variable snapshot they would have under sequential execution.
1082    let mut active: Vec<&Step> = Vec::new();
1083    let mut prompts: HashMap<String, String> = HashMap::new();
1084    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1085    for step in steps {
1086        if let Some(condition) = &step.condition {
1087            if !evaluate_condition(condition, vars)? {
1088                eprintln!(
1089                    "  skipping '{}' (condition not met: {condition})",
1090                    step.name
1091                );
1092                if let Some(w) = session {
1093                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1094                }
1095                continue;
1096            }
1097        }
1098        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1099        prompts.insert(step.name.clone(), prompt);
1100        if let Some(sp) = resolve_role_system_prompt(step, roles, resources, vars, workflow_dir)? {
1101            rendered_sps.insert(step.name.clone(), sp);
1102        }
1103        active.push(*step);
1104    }
1105
1106    if active.is_empty() {
1107        return Ok(());
1108    }
1109
1110    eprintln!("  running {} steps in parallel...", active.len());
1111
1112    let mut start_times: HashMap<String, Instant> = HashMap::new();
1113    let mut handles: Vec<thread::JoinHandle<(String, Result<String, ZigError>)>> = Vec::new();
1114    for step in &active {
1115        let step_clone: Step = (*step).clone();
1116        let prompt = prompts.remove(&step.name).unwrap_or_default();
1117        let rendered_sp = rendered_sps.remove(&step.name);
1118        let workflow_name_owned = workflow_name.to_string();
1119        let name = step.name.clone();
1120        eprintln!("  starting '{name}'...");
1121        if let Some(w) = session {
1122            let zag_session_id = format!("zig-{workflow_name}-{name}");
1123            let _ = w.step_started(
1124                &name,
1125                tier_index,
1126                &zag_session_id,
1127                zag_command_name(&step.command),
1128                step.model.as_deref(),
1129                &prompt_preview(&prompt),
1130            );
1131        }
1132        start_times.insert(name.clone(), Instant::now());
1133        let session_clone = session.cloned();
1134        let wf_provider = workflow_provider.map(String::from);
1135        let wf_model = workflow_model.map(String::from);
1136        let handle = thread::spawn(move || {
1137            let res = run_step_attempts(
1138                &step_clone,
1139                &prompt,
1140                &workflow_name_owned,
1141                Some(&name),
1142                session_clone.as_ref(),
1143                rendered_sp.as_deref(),
1144                wf_provider.as_deref(),
1145                wf_model.as_deref(),
1146            );
1147            (name, res)
1148        });
1149        handles.push(handle);
1150    }
1151
1152    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1153    for handle in handles {
1154        match handle.join() {
1155            Ok((name, res)) => {
1156                results.insert(name, res);
1157            }
1158            Err(_) => {
1159                return Err(ZigError::Execution("parallel step thread panicked".into()));
1160            }
1161        }
1162    }
1163
1164    // Process results in tier-declaration order so `next` is deterministic.
1165    let mut errors: Vec<String> = Vec::new();
1166    for step in &active {
1167        let Some(res) = results.remove(&step.name) else {
1168            continue;
1169        };
1170        let elapsed = start_times
1171            .remove(&step.name)
1172            .map(|t| t.elapsed().as_millis() as u64)
1173            .unwrap_or(0);
1174        match res {
1175            Ok(output) => {
1176                let mut saved_keys: Vec<String> = Vec::new();
1177                if !step.saves.is_empty() {
1178                    let saved = extract_saves(&output, &step.saves)?;
1179                    for (k, v) in &saved {
1180                        eprintln!("    saved {k} = {v}");
1181                        saved_keys.push(k.clone());
1182                    }
1183                    vars.extend(saved);
1184                }
1185                step_outputs.insert(step.name.clone(), output);
1186                eprintln!("  completed '{}'", step.name);
1187                if let Some(w) = session {
1188                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1189                }
1190                if step.next.is_some() && pending_next.is_none() {
1191                    *pending_next = step.next.clone();
1192                }
1193            }
1194            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1195                FailurePolicy::Continue => {
1196                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1197                }
1198                FailurePolicy::Fail | FailurePolicy::Retry => {
1199                    errors.push(format!("'{}': {e}", step.name));
1200                }
1201            },
1202        }
1203    }
1204
1205    if !errors.is_empty() {
1206        return Err(ZigError::Execution(format!(
1207            "parallel step(s) failed: {}",
1208            errors.join("; ")
1209        )));
1210    }
1211
1212    Ok(())
1213}
1214
1215/// Initialize the variable map from workflow variable definitions.
1216/// Variables with defaults are set to their default value; others are empty.
1217fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1218    let mut vars = HashMap::new();
1219    for (name, var) in &workflow.vars {
1220        let value = match &var.default {
1221            Some(toml::Value::String(s)) => s.clone(),
1222            Some(toml::Value::Integer(n)) => n.to_string(),
1223            Some(toml::Value::Float(f)) => f.to_string(),
1224            Some(toml::Value::Boolean(b)) => b.to_string(),
1225            Some(other) => other.to_string(),
1226            None => String::new(),
1227        };
1228        vars.insert(name.clone(), value);
1229    }
1230    vars
1231}
1232
1233/// Main execution loop for a validated workflow.
1234fn execute(
1235    workflow: &Workflow,
1236    workflow_path: &std::path::Path,
1237    user_prompt: Option<&str>,
1238    workflow_dir: &Path,
1239    disable_resources: bool,
1240) -> Result<(), ZigError> {
1241    let mut vars = init_vars(workflow);
1242
1243    let resource_collector = ResourceCollector::from_env(
1244        &workflow.workflow.name,
1245        &workflow.workflow.resources,
1246        workflow_dir,
1247        disable_resources,
1248    );
1249
1250    // Load file-backed variable defaults before prompt binding.
1251    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1252
1253    // Bind user prompt to the variable with `from = "prompt"`, if any.
1254    let prompt_var = workflow
1255        .vars
1256        .iter()
1257        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1258        .map(|(name, _)| name.clone());
1259
1260    if let Some(ref var_name) = prompt_var {
1261        if let Some(prompt) = user_prompt {
1262            vars.insert(var_name.clone(), prompt.to_string());
1263        }
1264    }
1265
1266    // Validate variable values against constraints before executing.
1267    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1268        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1269        return Err(ZigError::Validation(msgs.join("; ")));
1270    }
1271
1272    // When prompt is bound to a variable, don't also prepend "User context:".
1273    let effective_user_prompt = if prompt_var.is_some() {
1274        None
1275    } else {
1276        user_prompt
1277    };
1278
1279    let mut step_outputs: HashMap<String, String> = HashMap::new();
1280
1281    let wf_provider = workflow.workflow.provider.as_deref();
1282    let wf_model = workflow.workflow.model.as_deref();
1283
1284    let tiers = topological_sort(&workflow.steps)?;
1285
1286    eprintln!(
1287        "running workflow '{}' ({} steps in {} tiers)",
1288        workflow.workflow.name,
1289        workflow.steps.len(),
1290        tiers.len()
1291    );
1292
1293    // Open a zig session log for this run. Failure to open the log is
1294    // non-fatal — `zig run` should still execute even if `~/.zig` is
1295    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1296    // downstream so the writer-less path stays intact for tests.
1297    let coordinator = match SessionWriter::create(
1298        &workflow.workflow.name,
1299        &workflow_path.to_string_lossy(),
1300        user_prompt,
1301        tiers.len(),
1302    ) {
1303        Ok(writer) => {
1304            eprintln!("zig session: {}", writer.session_id());
1305            Some(SessionCoordinator::start(writer))
1306        }
1307        Err(e) => {
1308            eprintln!("warning: failed to open zig session log: {e}");
1309            None
1310        }
1311    };
1312    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1313    let session_ref = session_writer.as_ref();
1314
1315    let mut iteration = 0;
1316    let mut pending_next: Option<String> = None;
1317
1318    loop {
1319        let tiers_to_run = if let Some(ref next_step) = pending_next {
1320            // Re-run from the target step onward
1321            let remaining: Vec<Vec<&Step>> = tiers
1322                .iter()
1323                .map(|tier| {
1324                    tier.iter()
1325                        .filter(|s| s.name == *next_step)
1326                        .copied()
1327                        .collect::<Vec<_>>()
1328                })
1329                .filter(|tier| !tier.is_empty())
1330                .collect();
1331            pending_next = None;
1332            remaining
1333        } else if iteration == 0 {
1334            tiers.clone()
1335        } else {
1336            break;
1337        };
1338
1339        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1340            let (non_race, race_groups) = partition_tier(tier);
1341
1342            if let Some(w) = session_ref {
1343                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1344                let _ = w.tier_started(tier_index, names);
1345            }
1346
1347            // Independent steps in the same tier run concurrently. A single
1348            // step takes the sequential path so its output streams without
1349            // a name prefix; multiple steps go through the parallel path.
1350            if non_race.len() <= 1 {
1351                for step in &non_race {
1352                    execute_sequential_step(
1353                        step,
1354                        &mut vars,
1355                        effective_user_prompt,
1356                        &mut step_outputs,
1357                        &workflow.workflow.name,
1358                        &mut pending_next,
1359                        tier_index,
1360                        session_ref,
1361                        &workflow.roles,
1362                        &resource_collector,
1363                        workflow_dir,
1364                        wf_provider,
1365                        wf_model,
1366                    )?;
1367                }
1368            } else {
1369                execute_parallel_tier(
1370                    &non_race,
1371                    &mut vars,
1372                    effective_user_prompt,
1373                    &mut step_outputs,
1374                    &workflow.workflow.name,
1375                    &mut pending_next,
1376                    tier_index,
1377                    session_ref,
1378                    &workflow.roles,
1379                    &resource_collector,
1380                    workflow_dir,
1381                    wf_provider,
1382                    wf_model,
1383                )?;
1384            }
1385
1386            // Run race groups in parallel
1387            for (group_name, race_steps) in &race_groups {
1388                eprintln!("  starting race group '{group_name}'...");
1389
1390                // Build prompts for all racers (conditions evaluated here)
1391                let mut prompts = HashMap::new();
1392                let mut race_sps: HashMap<String, String> = HashMap::new();
1393                let mut active_steps: Vec<&Step> = Vec::new();
1394                for step in race_steps {
1395                    if let Some(condition) = &step.condition {
1396                        if !evaluate_condition(condition, &vars)? {
1397                            eprintln!(
1398                                "  skipping '{}' (condition not met: {condition})",
1399                                step.name
1400                            );
1401                            continue;
1402                        }
1403                    }
1404                    let prompt =
1405                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1406                    prompts.insert(step.name.clone(), prompt);
1407                    if let Some(sp) = resolve_role_system_prompt(
1408                        step,
1409                        &workflow.roles,
1410                        &resource_collector,
1411                        &vars,
1412                        workflow_dir,
1413                    )? {
1414                        race_sps.insert(step.name.clone(), sp);
1415                    }
1416                    active_steps.push(step);
1417                }
1418
1419                if active_steps.is_empty() {
1420                    continue;
1421                }
1422
1423                match execute_race_group(
1424                    &active_steps,
1425                    &prompts,
1426                    &race_sps,
1427                    &workflow.workflow.name,
1428                    tier_index,
1429                    session_ref,
1430                    wf_provider,
1431                    wf_model,
1432                ) {
1433                    Ok((winner_name, output)) => {
1434                        // Find the winning step to process saves/next
1435                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
1436                            if !winner.saves.is_empty() {
1437                                let saved = extract_saves(&output, &winner.saves)?;
1438                                for (k, v) in &saved {
1439                                    eprintln!("    saved {k} = {v}");
1440                                }
1441                                vars.extend(saved);
1442                            }
1443                            if winner.next.is_some() {
1444                                pending_next = winner.next.clone();
1445                            }
1446                        }
1447                        step_outputs.insert(winner_name.clone(), output);
1448                        eprintln!(
1449                            "  completed race group '{group_name}' (winner: '{winner_name}')"
1450                        );
1451                    }
1452                    Err(e) => return Err(e),
1453                }
1454            }
1455        }
1456
1457        iteration += 1;
1458        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
1459            if iteration >= MAX_LOOP_ITERATIONS {
1460                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
1461            }
1462            break;
1463        }
1464    }
1465
1466    eprintln!("workflow '{}' completed", workflow.workflow.name);
1467    if let Some(c) = coordinator {
1468        let _ = c.finish(SessionStatus::Success);
1469    }
1470    Ok(())
1471}
1472
1473/// Short label for the zag subcommand a step will invoke. Used in
1474/// `step_started` events so listeners can distinguish run/review/plan/etc.
1475fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
1476    match cmd {
1477        None => "run",
1478        Some(StepCommand::Review) => "review",
1479        Some(StepCommand::Plan) => "plan",
1480        Some(StepCommand::Pipe) => "pipe",
1481        Some(StepCommand::Collect) => "collect",
1482        Some(StepCommand::Summary) => "summary",
1483    }
1484}
1485
1486/// Truncated single-line preview of a rendered prompt for the session log.
1487fn prompt_preview(prompt: &str) -> String {
1488    const MAX: usize = 200;
1489    let collapsed: String = prompt
1490        .chars()
1491        .map(|c| if c == '\n' { ' ' } else { c })
1492        .collect();
1493    if collapsed.chars().count() <= MAX {
1494        collapsed
1495    } else {
1496        let truncated: String = collapsed.chars().take(MAX).collect();
1497        format!("{truncated}…")
1498    }
1499}
1500
1501#[cfg(test)]
1502#[path = "run_tests.rs"]
1503mod tests;