Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::session_log::LogEventKind;
10use zag_agent::{plan as agent_plan, review as agent_review};
11use zag_orch::collect as orch_collect;
12use zag_orch::summary as orch_summary;
13
14use crate::config::ZigConfig;
15use crate::dry_run::{DryRunContext, DryRunFormat};
16use crate::error::ZigError;
17use crate::memory::{MemoryCollector, render_memory_block};
18use crate::paths::expand_path;
19use crate::resources::{ResourceCollector, render_system_block};
20use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
21use crate::storage::{FilesystemBackend, StorageManager};
22use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
23use crate::workflow::{parser, validate};
24
25/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
26const MAX_LOOP_ITERATIONS: usize = 100;
27
28/// Execute a workflow file (`.zwf` or `.zwfz`).
29///
30/// Parses the workflow, validates it, resolves the step DAG, and executes
31/// each step via the embedded [`zag_agent::builder::AgentBuilder`] (and
32/// [`zag_orch`] for pipe/collect/summary commands). The optional
33/// `user_prompt` is injected as additional context into every step's prompt.
34///
35/// Resource advertisement (the `<resources>` block prepended to each step's
36/// system prompt) is enabled by default; pass `disable_resources = true` to
37/// opt out, e.g. via `zig run --no-resources`.
38///
39/// Memory injection (the `<memory>` block) is similarly enabled by default;
40/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
41///
42/// Storage injection (the `<storage>` block) is similarly enabled by default;
43/// pass `disable_storage = true` to opt out via `zig run --no-storage`.
44///
45/// When `dry_run = true`, the workflow is parsed, validated, and its plan is
46/// printed in the requested `format` — no agent invocation, session log,
47/// storage creation, or memory write occurs. The three `disable_*` flags are
48/// respected and surface in the plan output.
49#[allow(clippy::too_many_arguments)]
50pub async fn run_workflow(
51    workflow_path: &str,
52    user_prompt: Option<&str>,
53    disable_resources: bool,
54    disable_memory: bool,
55    disable_storage: bool,
56    dry_run: bool,
57    dry_run_format: DryRunFormat,
58) -> Result<(), ZigError> {
59    let path = resolve_workflow_path(workflow_path)?;
60    let (workflow, source) = parser::parse_workflow(&path)?;
61
62    if let Err(errors) = validate::validate(&workflow) {
63        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
64        return Err(ZigError::Validation(msgs.join("; ")));
65    }
66
67    execute(
68        &workflow,
69        &path,
70        user_prompt,
71        source.dir(),
72        disable_resources,
73        disable_memory,
74        disable_storage,
75        dry_run,
76        dry_run_format,
77    )
78    .await
79}
80
81/// Resolve a workflow argument to an actual file path.
82///
83/// Tries in order:
84/// 1. Literal path as given
85/// 2. With `.zwf` extension appended
86/// 3. With `.zwfz` extension appended
87/// 4. Under local project `.zig/workflows/` directory
88/// 5. Under local project `.zig/workflows/` with `.zwf` appended
89/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
90/// 7. Under global `~/.zig/workflows/` directory
91/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
92/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
93pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
94    let mut candidates = vec![
95        PathBuf::from(workflow),
96        PathBuf::from(format!("{workflow}.zwf")),
97        PathBuf::from(format!("{workflow}.zwfz")),
98    ];
99
100    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
101        candidates.push(local_dir.join(workflow));
102        candidates.push(local_dir.join(format!("{workflow}.zwf")));
103        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
104    }
105
106    if let Some(global_dir) = crate::paths::global_workflows_dir() {
107        candidates.push(global_dir.join(workflow));
108        candidates.push(global_dir.join(format!("{workflow}.zwf")));
109        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
110    }
111
112    for candidate in &candidates {
113        if candidate.exists() {
114            return Ok(candidate.clone());
115        }
116    }
117
118    Err(ZigError::Io(format!(
119        "workflow not found: '{workflow}' (tried: {})",
120        candidates
121            .iter()
122            .map(|p| p.display().to_string())
123            .collect::<Vec<_>>()
124            .join(", ")
125    )))
126}
127
128/// Compute a topological ordering of steps grouped into tiers.
129///
130/// Each tier contains steps whose dependencies are all in earlier tiers,
131/// meaning steps within a tier can (in principle) run in parallel.
132/// Uses Kahn's algorithm.
133pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
134    let step_index: HashMap<&str, usize> = steps
135        .iter()
136        .enumerate()
137        .map(|(i, s)| (s.name.as_str(), i))
138        .collect();
139
140    let mut in_degree = vec![0usize; steps.len()];
141    for (i, step) in steps.iter().enumerate() {
142        for dep in &step.depends_on {
143            if step_index.contains_key(dep.as_str()) {
144                in_degree[i] += 1;
145            }
146        }
147    }
148
149    let mut tiers = Vec::new();
150    let mut remaining = in_degree.clone();
151    let mut completed: Vec<bool> = vec![false; steps.len()];
152
153    loop {
154        let tier: Vec<usize> = (0..steps.len())
155            .filter(|&i| !completed[i] && remaining[i] == 0)
156            .collect();
157
158        if tier.is_empty() {
159            break;
160        }
161
162        for &i in &tier {
163            completed[i] = true;
164        }
165
166        // Decrement in-degrees for dependents of this tier
167        for &i in &tier {
168            for (j, step) in steps.iter().enumerate() {
169                if !completed[j] && step.depends_on.contains(&steps[i].name) {
170                    remaining[j] -= 1;
171                }
172            }
173        }
174
175        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
176    }
177
178    let completed_count: usize = completed.iter().filter(|&&c| c).count();
179    if completed_count != steps.len() {
180        return Err(ZigError::Execution(
181            "could not resolve all steps — possible undetected cycle".into(),
182        ));
183    }
184
185    Ok(tiers)
186}
187
188/// Replace `${var_name}` references in a template with values from the variable map.
189///
190/// Supports dotted paths like `${result.score}` — the root variable name is
191/// looked up, and if its value is valid JSON, the nested path is traversed.
192/// Unknown variables are left as-is.
193pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
194    let mut result = String::with_capacity(template.len());
195    let mut rest = template;
196
197    while let Some(start) = rest.find("${") {
198        result.push_str(&rest[..start]);
199        let after_start = &rest[start + 2..];
200
201        if let Some(end) = after_start.find('}') {
202            let var_expr = &after_start[..end];
203            let mut parts = var_expr.splitn(2, '.');
204            let root = parts.next().unwrap_or(var_expr);
205
206            if let Some(value) = vars.get(root) {
207                if let Some(path) = parts.next() {
208                    // Try to navigate a JSON path
209                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
210                        let resolved = json_path_lookup(&json, path);
211                        result.push_str(&resolved);
212                    } else {
213                        result.push_str(value);
214                    }
215                } else {
216                    result.push_str(value);
217                }
218            } else {
219                // Unknown variable — leave as-is
220                result.push_str(&rest[start..start + 2 + end + 1]);
221            }
222
223            rest = &after_start[end + 1..];
224        } else {
225            result.push_str(&rest[start..]);
226            rest = "";
227        }
228    }
229
230    result.push_str(rest);
231    result
232}
233
234/// Look up a dotted path in a JSON value (e.g., "nested.field").
235fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
236    let mut current = value;
237    for key in path.split('.') {
238        match current.get(key) {
239            Some(v) => current = v,
240            None => return format!("${{?.{path}}}"),
241        }
242    }
243    match current {
244        serde_json::Value::String(s) => s.clone(),
245        other => other.to_string(),
246    }
247}
248
249/// Resolve the effective system prompt for a step, with any advertised
250/// resources prepended as a `<resources>` block.
251///
252/// Resolution order:
253/// 1. If `step.system_prompt` is set, use it (with variable substitution).
254/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
255///    look it up in the roles table, and use the role's system prompt
256///    (loaded from file if `system_prompt_file` is set).
257/// 3. Otherwise the base prompt is empty.
258///
259/// Resources from all configured tiers (global shared, global per-workflow,
260/// project cwd, inline workflow, inline step) are then collected by the
261/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
262/// that is prepended to the base prompt. If both the resources set and the
263/// base prompt are empty, returns `None` — keeping the current behavior when
264/// nothing is configured.
265#[allow(clippy::too_many_arguments)]
266pub(crate) fn resolve_role_system_prompt(
267    step: &Step,
268    roles: &HashMap<String, Role>,
269    resources: &ResourceCollector<'_>,
270    memory: &MemoryCollector,
271    storage: &StorageManager,
272    vars: &HashMap<String, String>,
273    workflow_dir: &Path,
274    workflow_name: &str,
275) -> Result<Option<String>, ZigError> {
276    // Resolve the base system prompt (may be empty if neither is set).
277    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
278        Some(substitute_vars(sp, vars))
279    } else if let Some(ref role_ref) = step.role {
280        let resolved_name = substitute_vars(role_ref, vars);
281        let role = roles.get(&resolved_name).ok_or_else(|| {
282            ZigError::Execution(format!(
283                "step '{}' references role '{}' which does not exist",
284                step.name, resolved_name
285            ))
286        })?;
287
288        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
289            let full_path = workflow_dir.join(expand_path(file_path));
290            Some(std::fs::read_to_string(&full_path).map_err(|e| {
291                ZigError::Execution(format!(
292                    "failed to read system_prompt_file '{}' for role '{}': {e}",
293                    full_path.display(),
294                    resolved_name
295                ))
296            })?)
297        } else {
298            role.system_prompt.clone()
299        };
300
301        raw_prompt.map(|p| substitute_vars(&p, vars))
302    } else {
303        None
304    };
305
306    // Collect and render resources.
307    let set = resources.collect_for_step(&step.resources)?;
308    let resource_block = render_system_block(&set);
309
310    // Collect and render memory.
311    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
312    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
313
314    // Render storage block for this step's scope.
315    let storage_block = match storage.render_block(step.storage.as_deref())? {
316        Some(mut s) => {
317            s.push('\n');
318            s
319        }
320        None => String::new(),
321    };
322
323    let prefix = format!("{resource_block}{memory_block}{storage_block}");
324
325    match (prefix.is_empty(), base_prompt) {
326        (true, None) => Ok(None),
327        (true, Some(p)) => Ok(Some(p)),
328        (false, None) => Ok(Some(prefix.trim_end().to_string())),
329        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
330    }
331}
332
333/// Load file-backed default values for variables.
334///
335/// For each variable with `default_file` set (and no `default`), reads the
336/// file contents relative to `workflow_dir` and inserts them into the vars map.
337fn load_file_defaults(
338    vars: &mut HashMap<String, String>,
339    declarations: &HashMap<String, crate::workflow::model::Variable>,
340    workflow_dir: &Path,
341) -> Result<(), ZigError> {
342    for (name, decl) in declarations {
343        if decl.default.is_none() {
344            if let Some(ref file_path) = decl.default_file {
345                let full_path = workflow_dir.join(expand_path(file_path));
346                let content = std::fs::read_to_string(&full_path).map_err(|e| {
347                    ZigError::Execution(format!(
348                        "failed to read default_file '{}' for variable '{name}': {e}",
349                        full_path.display()
350                    ))
351                })?;
352                vars.insert(name.clone(), content);
353            }
354        }
355    }
356    Ok(())
357}
358
359/// Evaluate a simple condition expression against the current variable state.
360///
361/// Supports:
362/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
363/// - String equality: `status == "done"`, `status != "pending"`
364/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
365pub(crate) fn evaluate_condition(
366    condition: &str,
367    vars: &HashMap<String, String>,
368) -> Result<bool, ZigError> {
369    let condition = condition.trim();
370
371    // Try comparison operators (ordered by length to match `<=` before `<`)
372    let operators = ["<=", ">=", "!=", "==", "<", ">"];
373    for op in &operators {
374        if let Some(pos) = condition.find(op) {
375            let lhs = resolve_operand(condition[..pos].trim(), vars);
376            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
377            return Ok(compare(&lhs, &rhs, op));
378        }
379    }
380
381    // Truthy check: single variable name
382    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
383    Ok(is_truthy(value))
384}
385
386/// Resolve a condition operand to its string value.
387/// - String literals ("done") → done
388/// - Variable names → looked up in vars
389/// - Numeric literals → left as-is
390fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
391    // Strip surrounding quotes for string literals
392    if (token.starts_with('"') && token.ends_with('"'))
393        || (token.starts_with('\'') && token.ends_with('\''))
394    {
395        return token[1..token.len() - 1].to_string();
396    }
397    // Try variable lookup
398    if let Some(val) = vars.get(token) {
399        return val.clone();
400    }
401    // Return as-is (numeric literal or unknown)
402    token.to_string()
403}
404
405/// Compare two string operands with the given operator.
406/// Attempts numeric comparison first, falls back to lexicographic.
407fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
408    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
409        return match op {
410            "==" => (l - r).abs() < f64::EPSILON,
411            "!=" => (l - r).abs() >= f64::EPSILON,
412            "<" => l < r,
413            ">" => l > r,
414            "<=" => l <= r,
415            ">=" => l >= r,
416            _ => false,
417        };
418    }
419    match op {
420        "==" => lhs == rhs,
421        "!=" => lhs != rhs,
422        "<" => lhs < rhs,
423        ">" => lhs > rhs,
424        "<=" => lhs <= rhs,
425        ">=" => lhs >= rhs,
426        _ => false,
427    }
428}
429
430/// Check if a string value is truthy.
431fn is_truthy(value: &str) -> bool {
432    !value.is_empty() && value != "false" && value != "0"
433}
434
435/// Build the final prompt for a step, incorporating variable substitution,
436/// dependency outputs, and the user's context prompt.
437pub(crate) fn render_step_prompt(
438    step: &Step,
439    vars: &HashMap<String, String>,
440    user_prompt: Option<&str>,
441    dependency_outputs: &HashMap<String, String>,
442) -> String {
443    let mut prompt = String::new();
444
445    // Prepend user context if provided
446    if let Some(ctx) = user_prompt {
447        prompt.push_str(&format!("User context: {ctx}\n\n"));
448    }
449
450    // Inject dependency outputs if requested
451    if step.inject_context {
452        for dep in &step.depends_on {
453            if let Some(output) = dependency_outputs.get(dep) {
454                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
455            }
456        }
457    }
458
459    // Append the step's prompt with variable substitution
460    prompt.push_str(&substitute_vars(&step.prompt, vars));
461
462    prompt
463}
464
465/// Serializable snapshot of everything a step contributes to an agent
466/// invocation. Produced by [`build_agent_config`] and used by both the
467/// executor (to configure a [`AgentBuilder`]) and `--dry-run` (to show
468/// the author what each step will ask of the agent).
469///
470/// The shape mirrors [`AgentBuilder`] field-for-field; additional
471/// command-specific parameters (review/plan/pipe/collect/summary) are
472/// stored in the optional `command_params` bag.
473#[derive(Debug, Clone, Serialize, Default)]
474pub struct AgentConfig {
475    /// Subcommand label — `"run"`, `"review"`, `"plan"`, `"pipe"`,
476    /// `"collect"`, or `"summary"`.
477    pub command: String,
478
479    /// Agent-level knobs.
480    pub provider: Option<String>,
481    pub model: Option<String>,
482    pub system_prompt: Option<String>,
483    pub root: Option<String>,
484    pub add_dirs: Vec<String>,
485    #[serde(serialize_with = "serialize_env_pairs")]
486    pub env: Vec<(String, String)>,
487    pub files: Vec<String>,
488    pub auto_approve: bool,
489    /// `None` → no worktree, `Some(None)` → generated name, `Some(Some(n))` → explicit.
490    #[serde(skip_serializing_if = "Option::is_none")]
491    pub worktree: Option<Option<String>>,
492    #[serde(skip_serializing_if = "Option::is_none")]
493    pub sandbox: Option<String>,
494
495    /// Output shaping.
496    pub json_mode: bool,
497    #[serde(skip_serializing_if = "Option::is_none")]
498    pub json_schema: Option<String>,
499    #[serde(skip_serializing_if = "Option::is_none")]
500    pub output_format: Option<String>,
501
502    /// Turn / timeout / MCP.
503    #[serde(skip_serializing_if = "Option::is_none")]
504    pub max_turns: Option<u32>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    pub timeout: Option<String>,
507    #[serde(skip_serializing_if = "Option::is_none")]
508    pub mcp_config: Option<String>,
509
510    /// Session metadata — always set (session name derives from workflow/step).
511    pub session_name: String,
512    #[serde(skip_serializing_if = "Option::is_none")]
513    pub description: Option<String>,
514    pub tags: Vec<String>,
515
516    /// The effective prompt after dependency/context/plan prepending.
517    pub prompt: String,
518
519    /// Whether `accepts_agent_args` was true for this command — pipe/run/
520    /// review/plan/exec respect agent-level flags; collect/summary don't.
521    pub accepts_agent_args: bool,
522
523    /// Extra params for the non-plain commands. `None` for `run`.
524    #[serde(skip_serializing_if = "Option::is_none")]
525    pub command_params: Option<CommandParams>,
526
527    /// Interactive flag — steps with `interactive = true` run through
528    /// `AgentBuilder::run` instead of `exec`.
529    pub interactive: bool,
530}
531
532fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
533where
534    S: serde::Serializer,
535{
536    use serde::ser::SerializeSeq;
537    let mut seq = s.serialize_seq(Some(pairs.len()))?;
538    for (k, v) in pairs {
539        seq.serialize_element(&format!("{k}={v}"))?;
540    }
541    seq.end()
542}
543
544/// Command-specific parameter bag. Only populated for non-`run` commands.
545#[derive(Debug, Clone, Serialize)]
546#[serde(tag = "kind", rename_all = "snake_case")]
547pub enum CommandParams {
548    Review {
549        uncommitted: bool,
550        base: Option<String>,
551        commit: Option<String>,
552        title: Option<String>,
553    },
554    Plan {
555        output: Option<String>,
556        instructions: Option<String>,
557    },
558    Pipe {
559        session_ids: Vec<String>,
560    },
561    Collect {
562        session_ids: Vec<String>,
563    },
564    Summary {
565        session_ids: Vec<String>,
566    },
567}
568
569/// Build an [`AgentConfig`] snapshot for a step. Replaces the old argv
570/// builder (`build_zag_args`) — the returned config is applied to an
571/// [`AgentBuilder`] at execution time by [`apply_agent_config`].
572#[allow(clippy::too_many_arguments)]
573pub(crate) fn build_agent_config(
574    step: &Step,
575    prompt: &str,
576    workflow_name: &str,
577    model_override: Option<&str>,
578    rendered_system_prompt: Option<&str>,
579    workflow_provider: Option<&str>,
580    workflow_model: Option<&str>,
581    extra_add_dirs: &[std::path::PathBuf],
582) -> AgentConfig {
583    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
584
585    let (command_label, accepts_agent_args, command_params) = match &step.command {
586        None => ("run".to_string(), true, None),
587        Some(StepCommand::Review) => (
588            "review".to_string(),
589            true,
590            Some(CommandParams::Review {
591                uncommitted: step.uncommitted,
592                base: step.base.clone(),
593                commit: step.commit.clone(),
594                title: step.title.clone(),
595            }),
596        ),
597        Some(StepCommand::Plan) => (
598            "plan".to_string(),
599            true,
600            Some(CommandParams::Plan {
601                output: step.plan_output.as_deref().map(expand_path),
602                instructions: step.instructions.clone(),
603            }),
604        ),
605        Some(StepCommand::Pipe) => {
606            let session_ids: Vec<String> =
607                step.depends_on.iter().map(|d| session_name(d)).collect();
608            (
609                "pipe".to_string(),
610                true,
611                Some(CommandParams::Pipe { session_ids }),
612            )
613        }
614        Some(StepCommand::Collect) => {
615            let session_ids: Vec<String> =
616                step.depends_on.iter().map(|d| session_name(d)).collect();
617            (
618                "collect".to_string(),
619                false,
620                Some(CommandParams::Collect { session_ids }),
621            )
622        }
623        Some(StepCommand::Summary) => {
624            let session_ids: Vec<String> =
625                step.depends_on.iter().map(|d| session_name(d)).collect();
626            (
627                "summary".to_string(),
628                false,
629                Some(CommandParams::Summary { session_ids }),
630            )
631        }
632    };
633
634    // Build the agent config. Agent-level knobs only apply when the
635    // command accepts them (matching the old `accepts_agent_args` gate).
636    let mut cfg = AgentConfig {
637        command: command_label,
638        session_name: session_name(&step.name),
639        description: if step.description.is_empty() {
640            None
641        } else {
642            Some(step.description.clone())
643        },
644        tags: {
645            let mut t = vec!["zig-workflow".to_string()];
646            t.extend(step.tags.iter().cloned());
647            t
648        },
649        timeout: step.timeout.clone(),
650        prompt: prompt.to_string(),
651        accepts_agent_args,
652        command_params,
653        interactive: step.interactive,
654        ..Default::default()
655    };
656
657    if !accepts_agent_args {
658        return cfg;
659    }
660
661    cfg.provider = step
662        .provider
663        .clone()
664        .or_else(|| workflow_provider.map(String::from));
665    cfg.model = model_override
666        .map(String::from)
667        .or_else(|| step.model.clone())
668        .or_else(|| workflow_model.map(String::from));
669    cfg.system_prompt = rendered_system_prompt.map(String::from);
670    cfg.max_turns = step.max_turns;
671
672    // Output format: explicit `output` overrides the `json` bool; the
673    // two map onto AgentBuilder::output_format and AgentBuilder::json.
674    if let Some(output) = &step.output {
675        cfg.output_format = Some(output.clone());
676    } else if step.json {
677        cfg.json_mode = true;
678    }
679    cfg.json_schema = step.json_schema.clone();
680    cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
681
682    cfg.auto_approve = step.auto_approve;
683    cfg.root = step.root.as_deref().map(expand_path);
684    cfg.add_dirs = step
685        .add_dirs
686        .iter()
687        .map(|d| expand_path(d))
688        .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
689        .collect();
690    cfg.env = step
691        .env
692        .iter()
693        .map(|(k, v)| (k.clone(), v.clone()))
694        .collect();
695    cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
696
697    // Isolation
698    if step.worktree {
699        cfg.worktree = Some(None);
700    }
701    cfg.sandbox = step.sandbox.clone();
702
703    // Interactive agents get a self-terminate instruction appended so
704    // the session exits when the agent is done instead of waiting on
705    // the user to close it.
706    if cfg.interactive {
707        let base = cfg.system_prompt.take().unwrap_or_default();
708        cfg.system_prompt = Some(format!(
709            "{base}{}",
710            crate::self_cmd::INTERACTIVE_SELF_TERMINATE_INSTRUCTION
711        ));
712    }
713
714    cfg
715}
716
717/// Apply an [`AgentConfig`] to an [`AgentBuilder`]. The prompt itself
718/// is NOT set here — callers pass it to `exec`/`run` directly.
719pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
720    if let Some(ref p) = cfg.provider {
721        builder = builder.provider(p);
722    }
723    if let Some(ref m) = cfg.model {
724        builder = builder.model(m);
725    }
726    if let Some(ref sp) = cfg.system_prompt {
727        builder = builder.system_prompt(sp);
728    }
729    if let Some(ref r) = cfg.root {
730        builder = builder.root(r);
731    }
732    if cfg.auto_approve {
733        builder = builder.auto_approve(true);
734    }
735    for dir in &cfg.add_dirs {
736        builder = builder.add_dir(dir);
737    }
738    for (k, v) in &cfg.env {
739        builder = builder.env(k, v);
740    }
741    for f in &cfg.files {
742        builder = builder.file(f);
743    }
744    if let Some(ref wt) = cfg.worktree {
745        builder = builder.worktree(wt.as_deref());
746    }
747    if let Some(ref sb) = cfg.sandbox {
748        builder = builder.sandbox(Some(sb));
749    }
750    if let Some(ref fmt) = cfg.output_format {
751        builder = builder.output_format(fmt);
752    }
753    if cfg.json_mode {
754        if let Some(ref schema) = cfg.json_schema {
755            if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
756                builder = builder.json_schema(v);
757            } else {
758                builder = builder.json();
759            }
760        } else {
761            builder = builder.json();
762        }
763    } else if let Some(ref schema) = cfg.json_schema {
764        if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
765            builder = builder.json_schema(v);
766        }
767    }
768    if let Some(turns) = cfg.max_turns {
769        builder = builder.max_turns(turns);
770    }
771    if let Some(ref t) = cfg.timeout {
772        if let Some(dur) = parse_timeout_string(t) {
773            builder = builder.timeout(dur);
774        }
775    }
776    if let Some(ref mcp) = cfg.mcp_config {
777        builder = builder.mcp_config(mcp);
778    }
779    builder = builder.name(&cfg.session_name);
780    if let Some(ref d) = cfg.description {
781        builder = builder.description(d);
782    }
783    for tag in &cfg.tags {
784        builder = builder.tag(tag);
785    }
786    builder
787}
788
789/// Parse a duration string like `"5m"`, `"30s"`, `"1h30m"` into a
790/// [`std::time::Duration`]. Returns `None` if the format is unrecognised.
791fn parse_timeout_string(s: &str) -> Option<Duration> {
792    // Mirrors zag_orch::duration::parse_duration — allows `1h30m`, `5m`,
793    // `30s`, `500ms`, bare seconds (`60`). Keep the dependency inline so
794    // we don't have to thread its error type through ZigError.
795    let s = s.trim();
796    if s.is_empty() {
797        return None;
798    }
799    if let Ok(secs) = s.parse::<u64>() {
800        return Some(Duration::from_secs(secs));
801    }
802    let mut total = Duration::ZERO;
803    let mut current = String::new();
804    let mut chars = s.chars().peekable();
805    while let Some(c) = chars.next() {
806        if c.is_ascii_digit() || c == '.' {
807            current.push(c);
808            continue;
809        }
810        let mut unit = String::from(c);
811        if c == 'm' && chars.peek() == Some(&'s') {
812            unit.push(chars.next().unwrap());
813        }
814        let value: f64 = current.parse().ok()?;
815        current.clear();
816        let piece = match unit.as_str() {
817            "ms" => Duration::from_millis(value as u64),
818            "s" => Duration::from_secs_f64(value),
819            "m" => Duration::from_secs_f64(value * 60.0),
820            "h" => Duration::from_secs_f64(value * 3600.0),
821            _ => return None,
822        };
823        total += piece;
824    }
825    if !current.is_empty() {
826        return None;
827    }
828    Some(total)
829}
830
831/// Dispatch a built [`AgentConfig`] through the appropriate `zag-agent`
832/// / `zag-orch` entry point and return the captured result text used for
833/// `saves` and dependency injection.
834///
835/// Every path that drives an agent attaches a live `.on_log_event` hook
836/// via [`install_live_streaming`] so per-turn activity (assistant
837/// messages, tool calls, reasoning) streams to stderr and the zig session
838/// log as it happens — restoring the old `zag run` streaming UX after the
839/// subprocess-drop refactor.
840///
841/// - `run` → [`AgentBuilder::exec`] (or [`AgentBuilder::run`] for `interactive`).
842/// - `review` (non-codex) → prompt built inline, then `AgentBuilder::exec`.
843/// - `review` (codex) → [`zag_agent::review::run_review`] (no live stream; codex has a native review flow).
844/// - `plan` → prompt built via [`zag_agent::plan::build_plan_prompt`], then `AgentBuilder::exec`, result optionally written to a file.
845/// - `pipe` → context built inline via [`zag_orch::collect::extract_last_assistant_message`], then `AgentBuilder::exec`.
846/// - `collect` → [`zag_orch::collect::collect_results`] (serialized as JSON; no agent).
847/// - `summary` → [`zag_orch::summary::summarize_sessions`] (serialized as JSON; no agent).
848async fn dispatch_agent(
849    cfg: &AgentConfig,
850    step_name: &str,
851    workflow_name: &str,
852    session: Option<&Arc<SessionWriter>>,
853    prefix: Option<&str>,
854) -> Result<String, ZigError> {
855    match cfg.command.as_str() {
856        "run" => {
857            if cfg.interactive {
858                // Interactive sessions inherit stdio — the provider TUI
859                // takes over the terminal and renders events directly, so
860                // no log-event hook is wired here.
861                //
862                // Register with zag's process store + inject `ZAG_PROCESS_ID`
863                // / `ZAG_SESSION_ID` etc. into the agent subprocess so the
864                // `INTERACTIVE_SELF_TERMINATE_INSTRUCTION` system-prompt
865                // tail's `zig self terminate` actually resolves to this
866                // step's agent process. Without this, the agent would dutifully
867                // run the command per the instruction, but `zag_orch::ps::
868                // request_kill("self")` would fail with "ZAG_PROCESS_ID is not
869                // set" — see `zig-core/src/self_cmd.rs::terminate`.
870                let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
871                builder = builder.register_process(
872                    zag_agent::process_registration::RegisterOptionsOwned {
873                        provider: cfg.provider.clone().unwrap_or_else(|| "claude".to_string()),
874                        model: cfg.model.clone().unwrap_or_default(),
875                        command: "run".to_string(),
876                        prompt_preview: Some(prompt_preview(&cfg.prompt)),
877                        session_id: Some(format!("zig-{workflow_name}-{step_name}")),
878                        session_name: None,
879                        root: cfg.root.clone(),
880                    },
881                );
882                builder.run(Some(&cfg.prompt)).await.map_err(|e| {
883                    ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
884                })?;
885                Ok(String::new())
886            } else {
887                let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
888                builder = install_live_streaming(builder, step_name, session, prefix);
889                let output = builder.exec(&cfg.prompt).await.map_err(|e| {
890                    ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
891                })?;
892                Ok(output.result.unwrap_or_default())
893            }
894        }
895        "review" => {
896            let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
897            let (uncommitted, base, commit, title) = match &cfg.command_params {
898                Some(CommandParams::Review {
899                    uncommitted,
900                    base,
901                    commit,
902                    title,
903                }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
904                _ => (false, None, None, None),
905            };
906
907            // Codex has a native review flow inside zag-agent that we
908            // don't want to reimplement — fall back to the library call.
909            // No live stream in that branch; same as before this fix.
910            if provider == "codex" {
911                let params = agent_review::ReviewParams {
912                    provider,
913                    uncommitted,
914                    base,
915                    commit,
916                    title,
917                    prompt: if cfg.prompt.is_empty() {
918                        None
919                    } else {
920                        Some(cfg.prompt.clone())
921                    },
922                    system_prompt: cfg.system_prompt.clone(),
923                    model: cfg.model.clone(),
924                    root: cfg.root.clone(),
925                    auto_approve: cfg.auto_approve,
926                    add_dirs: cfg.add_dirs.clone(),
927                    progress: Box::new(zag_agent::progress::SilentProgress),
928                };
929                let output = agent_review::run_review(params).await.map_err(|e| {
930                    ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
931                })?;
932                return Ok(output.and_then(|o| o.result).unwrap_or_default());
933            }
934
935            // Non-codex review: build the diff and prompt in-process, then
936            // drive AgentBuilder directly so we can attach the live hook.
937            let diff = agent_review::gather_diff(
938                uncommitted,
939                base.as_deref(),
940                commit.as_deref(),
941                cfg.root.as_deref(),
942            )
943            .map_err(|e| {
944                ZigError::Zag(format!(
945                    "review gather_diff failed for step '{step_name}': {e}"
946                ))
947            })?;
948            let user_prompt = if cfg.prompt.is_empty() {
949                None
950            } else {
951                Some(cfg.prompt.as_str())
952            };
953            let review_prompt =
954                agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
955
956            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
957            builder = install_live_streaming(builder, step_name, session, prefix);
958            let output = builder.exec(&review_prompt).await.map_err(|e| {
959                ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
960            })?;
961            Ok(output.result.unwrap_or_default())
962        }
963        "plan" => {
964            let (plan_output_path, instructions) = match &cfg.command_params {
965                Some(CommandParams::Plan {
966                    output,
967                    instructions,
968                }) => (output.clone(), instructions.clone()),
969                _ => (None, None),
970            };
971
972            let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
973
974            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
975            builder = install_live_streaming(builder, step_name, session, prefix);
976            let output = builder.exec(&plan_prompt).await.map_err(|e| {
977                ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
978            })?;
979            let text = output.result.unwrap_or_default();
980
981            if let Some(path_str) = plan_output_path {
982                let target = resolve_plan_output_path(&path_str);
983                if let Some(parent) = target.parent()
984                    && !parent.as_os_str().is_empty()
985                {
986                    std::fs::create_dir_all(parent).map_err(|e| {
987                        ZigError::Io(format!(
988                            "failed to create plan output directory {}: {e}",
989                            parent.display()
990                        ))
991                    })?;
992                }
993                std::fs::write(&target, &text).map_err(|e| {
994                    ZigError::Io(format!(
995                        "failed to write plan output to {}: {e}",
996                        target.display()
997                    ))
998                })?;
999                eprintln!("plan written to {}", target.display());
1000            }
1001
1002            Ok(text)
1003        }
1004        "pipe" => {
1005            let session_ids = match &cfg.command_params {
1006                Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
1007                _ => &[] as &[String],
1008            };
1009            let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
1010            let combined = format!(
1011                "Here are results from previous agent sessions:\n\n{context}\n\n{}",
1012                cfg.prompt
1013            );
1014
1015            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
1016            builder = install_live_streaming(builder, step_name, session, prefix);
1017            let output = builder.exec(&combined).await.map_err(|e| {
1018                ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
1019            })?;
1020            Ok(output.result.unwrap_or_default())
1021        }
1022        "collect" => {
1023            let session_ids = match &cfg.command_params {
1024                Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
1025                _ => Vec::new(),
1026            };
1027            let params = orch_collect::CollectParams {
1028                session_ids,
1029                tag: None,
1030                json: true,
1031                root: cfg.root.clone(),
1032            };
1033            let results = orch_collect::collect_results(&params).map_err(|e| {
1034                ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1035            })?;
1036            let json = serde_json::to_string(&results)
1037                .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1038            emit_captured(&json, step_name, session, prefix);
1039            Ok(json)
1040        }
1041        "summary" => {
1042            let session_ids = match &cfg.command_params {
1043                Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1044                _ => Vec::new(),
1045            };
1046            let params = orch_summary::SummaryParams {
1047                session_ids,
1048                tag: None,
1049                stats: false,
1050                json: true,
1051                root: cfg.root.clone(),
1052            };
1053            let results = orch_summary::summarize_sessions(&params).map_err(|e| {
1054                ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1055            })?;
1056            let json = serde_json::to_string(&results)
1057                .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1058            emit_captured(&json, step_name, session, prefix);
1059            Ok(json)
1060        }
1061        other => Err(ZigError::Execution(format!(
1062            "unknown command '{other}' for step '{step_name}'"
1063        ))),
1064    }
1065}
1066
1067/// Attach a live event stream to `builder`. Every [`AgentLogEvent`] that
1068/// reaches the session log fires the closure: rendered via
1069/// [`zag_agent::listen::format_event_text`] and routed to both stderr
1070/// (with optional `[prefix]` tagging for parallel tiers) and the zig
1071/// [`SessionWriter`] as `StepOutput` events — the same surface the old
1072/// `run_zag_streaming` helper produced before we dropped the subprocess.
1073fn install_live_streaming(
1074    builder: AgentBuilder,
1075    step_name: &str,
1076    session: Option<&Arc<SessionWriter>>,
1077    prefix: Option<&str>,
1078) -> AgentBuilder {
1079    let step_name_owned = step_name.to_string();
1080    let prefix_owned = prefix.map(String::from);
1081    let session_owned = session.cloned();
1082    let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
1083    builder.on_log_event(move |evt| {
1084        if matches!(evt.kind, LogEventKind::Heartbeat { .. }) {
1085            let elapsed = last_activity.lock().unwrap().elapsed().as_secs();
1086            emit_heartbeat_line(elapsed, prefix_owned.as_deref());
1087            return;
1088        }
1089        *last_activity.lock().unwrap() = Instant::now();
1090        let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1091            return;
1092        };
1093        emit_live_line(
1094            &text,
1095            &step_name_owned,
1096            session_owned.as_ref(),
1097            prefix_owned.as_deref(),
1098        );
1099    })
1100}
1101
1102/// Write one or more rendered lines to stderr (with optional prefix) and
1103/// mirror each line to the zig session writer.
1104fn emit_live_line(
1105    text: &str,
1106    step_name: &str,
1107    session: Option<&Arc<SessionWriter>>,
1108    prefix: Option<&str>,
1109) {
1110    use std::io::Write;
1111    if text.is_empty() {
1112        return;
1113    }
1114    let stderr = std::io::stderr();
1115    for line in text.lines() {
1116        if let Some(w) = session {
1117            let _ = w.step_output(step_name, OutputStream::Stdout, line);
1118        }
1119        let mut h = stderr.lock();
1120        let _ = match prefix {
1121            Some(p) => writeln!(h, "[{p}] {line}"),
1122            None => writeln!(h, "{line}"),
1123        };
1124    }
1125}
1126
1127/// Emit a heartbeat pulse to stderr so users can see the step is still
1128/// alive during silent stretches (e.g., the model is reasoning before
1129/// its first tool call). Skips the session writer to keep the persisted
1130/// event log focused on real activity.
1131fn emit_heartbeat_line(elapsed_secs: u64, prefix: Option<&str>) {
1132    use std::io::Write;
1133    let line = format!("  \u{00b7} waiting ({elapsed_secs}s)");
1134    let stderr = std::io::stderr();
1135    let mut h = stderr.lock();
1136    let _ = match prefix {
1137        Some(p) => writeln!(h, "[{p}] {line}"),
1138        None => writeln!(h, "{line}"),
1139    };
1140}
1141
1142/// Emit captured non-agent output (collect / summary JSON) line-by-line
1143/// to stderr and the session writer. Kept for the two paths that don't
1144/// run an agent and therefore can't use [`install_live_streaming`].
1145fn emit_captured(
1146    text: &str,
1147    step_name: &str,
1148    session: Option<&Arc<SessionWriter>>,
1149    prefix: Option<&str>,
1150) {
1151    emit_live_line(text, step_name, session, prefix);
1152}
1153
1154/// Build the `<session-result>` context block from upstream session IDs.
1155///
1156/// Mirrors `zag_orch::pipe::build_context` (`zag-orch/src/pipe.rs:86-118`)
1157/// byte-for-byte so the combined prompt zig feeds the agent matches what
1158/// the `zag pipe` CLI would have produced.
1159fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1160    let mut parts = Vec::new();
1161    for (i, id) in session_ids.iter().enumerate() {
1162        let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1163            eprintln!("warning: no result found for upstream session {id}");
1164            continue;
1165        };
1166        let short = &id[..id.len().min(8)];
1167        let block = if session_ids.len() == 1 {
1168            format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1169        } else {
1170            format!(
1171                "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1172                i + 1
1173            )
1174        };
1175        parts.push(block);
1176    }
1177
1178    if parts.is_empty() {
1179        return Err(ZigError::Execution(
1180            "pipe: no results available from the specified sessions".into(),
1181        ));
1182    }
1183    Ok(parts.join("\n\n"))
1184}
1185
1186/// Resolve a `plan_output` path. If the caller specified a bare directory
1187/// name (no extension), append a timestamped `plan-YYYYMMDD-HHMMSS.md`
1188/// inside it — matching the behavior documented on
1189/// [`zag_agent::plan::PlanParams::output`].
1190fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1191    let expanded = expand_path(path_str);
1192    let path = std::path::PathBuf::from(&expanded);
1193    if path.extension().is_some() {
1194        return path;
1195    }
1196    let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1197    path.join(format!("plan-{stamp}.md"))
1198}
1199
1200/// Execute a single step through the agent-builder dispatch. Returns the
1201/// captured result text used for `saves` and dependency injection. The
1202/// optional `model_override` is used during retries to escalate to a
1203/// different model.
1204#[allow(clippy::too_many_arguments)]
1205async fn execute_step(
1206    step: &Step,
1207    prompt: &str,
1208    workflow_name: &str,
1209    model_override: Option<&str>,
1210    prefix: Option<&str>,
1211    session: Option<&Arc<SessionWriter>>,
1212    rendered_system_prompt: Option<&str>,
1213    workflow_provider: Option<&str>,
1214    workflow_model: Option<&str>,
1215    extra_add_dirs: &[std::path::PathBuf],
1216) -> Result<String, ZigError> {
1217    let cfg = build_agent_config(
1218        step,
1219        prompt,
1220        workflow_name,
1221        model_override,
1222        rendered_system_prompt,
1223        workflow_provider,
1224        workflow_model,
1225        extra_add_dirs,
1226    );
1227    dispatch_agent(&cfg, &step.name, workflow_name, session, prefix).await
1228}
1229
1230/// Run a step with retry logic, returning its captured output on success.
1231///
1232/// Extracted so both sequential and parallel execution paths share the
1233/// same retry / model-escalation behavior.
1234#[allow(clippy::too_many_arguments)]
1235async fn run_step_attempts(
1236    step: &Step,
1237    prompt: &str,
1238    workflow_name: &str,
1239    prefix: Option<&str>,
1240    session: Option<&Arc<SessionWriter>>,
1241    rendered_system_prompt: Option<&str>,
1242    workflow_provider: Option<&str>,
1243    workflow_model: Option<&str>,
1244    extra_add_dirs: &[std::path::PathBuf],
1245) -> Result<String, ZigError> {
1246    let mut attempts = 0;
1247    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1248        step.max_retries.unwrap_or(1) + 1
1249    } else {
1250        1
1251    };
1252
1253    loop {
1254        attempts += 1;
1255        let model_override = if attempts > 1 {
1256            step.retry_model.as_deref()
1257        } else {
1258            None
1259        };
1260        match execute_step(
1261            step,
1262            prompt,
1263            workflow_name,
1264            model_override,
1265            prefix,
1266            session,
1267            rendered_system_prompt,
1268            workflow_provider,
1269            workflow_model,
1270            extra_add_dirs,
1271        )
1272        .await
1273        {
1274            Ok(output) => return Ok(output),
1275            Err(e) => {
1276                if let Some(w) = session {
1277                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1278                }
1279                if attempts < max_attempts {
1280                    eprintln!(
1281                        "    retry {}/{} for step '{}'",
1282                        attempts,
1283                        max_attempts - 1,
1284                        step.name
1285                    );
1286                    continue;
1287                }
1288                return Err(e);
1289            }
1290        }
1291    }
1292}
1293
1294/// Extract variable values from step output using `saves` selectors.
1295///
1296/// Selectors:
1297/// - `"$"` — the entire output
1298/// - `"$.field"` — a top-level JSON field
1299/// - `"$.nested.field"` — a nested JSON field
1300fn extract_saves(
1301    output: &str,
1302    saves: &HashMap<String, String>,
1303) -> Result<HashMap<String, String>, ZigError> {
1304    let mut extracted = HashMap::new();
1305
1306    for (var_name, selector) in saves {
1307        let value = if selector == "$" {
1308            output.trim().to_string()
1309        } else if let Some(path) = selector.strip_prefix("$.") {
1310            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1311                ZigError::Execution(format!(
1312                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1313                ))
1314            })?;
1315            json_path_lookup(&json, path)
1316        } else {
1317            output.trim().to_string()
1318        };
1319
1320        extracted.insert(var_name.clone(), value);
1321    }
1322
1323    Ok(extracted)
1324}
1325
1326/// Partition a tier of steps into sequential steps and race groups.
1327///
1328/// Steps without a `race_group` are returned as sequential. Steps sharing
1329/// the same `race_group` value are grouped together for parallel execution.
1330fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1331    let mut sequential = Vec::new();
1332    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1333
1334    for step in tier {
1335        if let Some(group) = &step.race_group {
1336            race_groups.entry(group.clone()).or_default().push(step);
1337        } else {
1338            sequential.push(*step);
1339        }
1340    }
1341
1342    (sequential, race_groups)
1343}
1344
1345/// Execute a race group: run all steps concurrently via a [`JoinSet`] and
1346/// return the first winner. Once one step succeeds, the remaining tasks
1347/// are aborted (the underlying `tokio::process::Child` is dropped, which
1348/// kills the provider subprocess if `kill_on_drop` is set — zag-agent
1349/// sets this on its internal commands).
1350#[allow(clippy::too_many_arguments)]
1351async fn execute_race_group(
1352    steps: &[&Step],
1353    prompts: &HashMap<String, String>,
1354    system_prompts: &HashMap<String, String>,
1355    workflow_name: &str,
1356    tier_index: usize,
1357    session: Option<&Arc<SessionWriter>>,
1358    workflow_provider: Option<&str>,
1359    workflow_model: Option<&str>,
1360    storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1361) -> Result<(String, String), ZigError> {
1362    if let Some(w) = session {
1363        for step in steps {
1364            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1365            let preview = prompts
1366                .get(&step.name)
1367                .map(|p| prompt_preview(p))
1368                .unwrap_or_default();
1369            let _ = w.step_started(
1370                &step.name,
1371                tier_index,
1372                &zag_session_id,
1373                zag_command_name(&step.command),
1374                step.model.as_deref(),
1375                &preview,
1376            );
1377        }
1378    }
1379
1380    let race_started = Instant::now();
1381    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1382
1383    for step in steps {
1384        let prompt = prompts
1385            .get(&step.name)
1386            .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1387            .clone();
1388        eprintln!("  racing step '{}'...", step.name);
1389        let rendered_sp = system_prompts.get(&step.name).cloned();
1390        let empty: Vec<std::path::PathBuf> = Vec::new();
1391        let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1392        let step_clone: Step = (*step).clone();
1393        let wf_name = workflow_name.to_string();
1394        let wf_provider = workflow_provider.map(String::from);
1395        let wf_model = workflow_model.map(String::from);
1396        let session_clone = session.cloned();
1397        let name = step.name.clone();
1398        set.spawn(async move {
1399            let res = execute_step(
1400                &step_clone,
1401                &prompt,
1402                &wf_name,
1403                None,
1404                None,
1405                session_clone.as_ref(),
1406                rendered_sp.as_deref(),
1407                wf_provider.as_deref(),
1408                wf_model.as_deref(),
1409                &extra_dirs,
1410            )
1411            .await;
1412            (name, res)
1413        });
1414    }
1415
1416    // Wait for the first winner — drop (abort) the rest.
1417    while let Some(joined) = set.join_next().await {
1418        let (winner_name, result) = match joined {
1419            Ok(pair) => pair,
1420            Err(e) if e.is_cancelled() => continue,
1421            Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1422        };
1423        match result {
1424            Ok(stdout) => {
1425                // Abort losers.
1426                set.abort_all();
1427                while let Some(r) = set.join_next().await {
1428                    if let Ok((name, _)) = r {
1429                        eprintln!("  cancelling step '{name}' (race lost)");
1430                    }
1431                }
1432                let elapsed = race_started.elapsed().as_millis() as u64;
1433                eprintln!("  race won by '{winner_name}'");
1434                if let Some(w) = session {
1435                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1436                }
1437                return Ok((winner_name, stdout));
1438            }
1439            Err(e) => {
1440                if let Some(w) = session {
1441                    let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1442                }
1443                // Keep racing remaining tasks; this one lost.
1444                continue;
1445            }
1446        }
1447    }
1448
1449    Err(ZigError::Execution(
1450        "all racers failed without a winner".into(),
1451    ))
1452}
1453
1454/// Execute a single sequential step with retry logic, saves, and next-jump handling.
1455#[allow(clippy::too_many_arguments)]
1456async fn execute_sequential_step(
1457    step: &Step,
1458    vars: &mut HashMap<String, String>,
1459    user_prompt: Option<&str>,
1460    step_outputs: &mut HashMap<String, String>,
1461    workflow_name: &str,
1462    pending_next: &mut Option<String>,
1463    tier_index: usize,
1464    session: Option<&Arc<SessionWriter>>,
1465    roles: &HashMap<String, Role>,
1466    resources: &ResourceCollector<'_>,
1467    memory: &MemoryCollector,
1468    storage: &StorageManager,
1469    workflow_dir: &Path,
1470    workflow_provider: Option<&str>,
1471    workflow_model: Option<&str>,
1472) -> Result<(), ZigError> {
1473    if let Some(condition) = &step.condition {
1474        if !evaluate_condition(condition, vars)? {
1475            eprintln!(
1476                "  skipping '{}' (condition not met: {condition})",
1477                step.name
1478            );
1479            if let Some(w) = session {
1480                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1481            }
1482            return Ok(());
1483        }
1484    }
1485
1486    eprintln!("  running step '{}'...", step.name);
1487
1488    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1489    let rendered_sp = resolve_role_system_prompt(
1490        step,
1491        roles,
1492        resources,
1493        memory,
1494        storage,
1495        vars,
1496        workflow_dir,
1497        workflow_name,
1498    )?;
1499    let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1500    if let Some(w) = session {
1501        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1502        let _ = w.step_started(
1503            &step.name,
1504            tier_index,
1505            &zag_session_id,
1506            zag_command_name(&step.command),
1507            step.model.as_deref(),
1508            &prompt_preview(&prompt),
1509        );
1510    }
1511    let started = Instant::now();
1512    let result = run_step_attempts(
1513        step,
1514        &prompt,
1515        workflow_name,
1516        None,
1517        session,
1518        rendered_sp.as_deref(),
1519        workflow_provider,
1520        workflow_model,
1521        &storage_dirs,
1522    )
1523    .await;
1524
1525    match result {
1526        Ok(output) => {
1527            let mut saved_keys: Vec<String> = Vec::new();
1528            if !step.saves.is_empty() {
1529                let saved = extract_saves(&output, &step.saves)?;
1530                for (k, v) in &saved {
1531                    eprintln!("    saved {k} = {v}");
1532                    saved_keys.push(k.clone());
1533                }
1534                vars.extend(saved);
1535            }
1536
1537            step_outputs.insert(step.name.clone(), output);
1538            eprintln!("  completed '{}'", step.name);
1539            if let Some(w) = session {
1540                let _ = w.step_completed(
1541                    &step.name,
1542                    0,
1543                    started.elapsed().as_millis() as u64,
1544                    saved_keys,
1545                );
1546            }
1547
1548            if step.next.is_some() {
1549                *pending_next = step.next.clone();
1550            }
1551        }
1552        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1553            FailurePolicy::Fail => return Err(e),
1554            FailurePolicy::Continue => {
1555                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1556            }
1557            FailurePolicy::Retry => {
1558                return Err(e);
1559            }
1560        },
1561    }
1562
1563    Ok(())
1564}
1565
1566/// Run multiple independent steps in a tier concurrently.
1567///
1568/// All non-skipped steps are spawned as tokio tasks via [`JoinSet`] and
1569/// we wait for every one to finish (unlike race groups, which abort
1570/// losers). Captured output lines are written back to stderr after each
1571/// task completes, prefixed with the step name to disambiguate. Results
1572/// are processed in tier-declaration order so `saves`, `next`, and
1573/// `on_failure` semantics remain deterministic.
1574#[allow(clippy::too_many_arguments)]
1575async fn execute_parallel_tier(
1576    steps: &[&Step],
1577    vars: &mut HashMap<String, String>,
1578    user_prompt: Option<&str>,
1579    step_outputs: &mut HashMap<String, String>,
1580    workflow_name: &str,
1581    pending_next: &mut Option<String>,
1582    tier_index: usize,
1583    session: Option<&Arc<SessionWriter>>,
1584    roles: &HashMap<String, Role>,
1585    resources: &ResourceCollector<'_>,
1586    memory: &MemoryCollector,
1587    storage: &StorageManager,
1588    workflow_dir: &Path,
1589    workflow_provider: Option<&str>,
1590    workflow_model: Option<&str>,
1591) -> Result<(), ZigError> {
1592    // Evaluate conditions and render prompts up front, so threads receive
1593    // the same variable snapshot they would have under sequential execution.
1594    let mut active: Vec<&Step> = Vec::new();
1595    let mut prompts: HashMap<String, String> = HashMap::new();
1596    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1597    let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1598    for step in steps {
1599        if let Some(condition) = &step.condition {
1600            if !evaluate_condition(condition, vars)? {
1601                eprintln!(
1602                    "  skipping '{}' (condition not met: {condition})",
1603                    step.name
1604                );
1605                if let Some(w) = session {
1606                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1607                }
1608                continue;
1609            }
1610        }
1611        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1612        prompts.insert(step.name.clone(), prompt);
1613        if let Some(sp) = resolve_role_system_prompt(
1614            step,
1615            roles,
1616            resources,
1617            memory,
1618            storage,
1619            vars,
1620            workflow_dir,
1621            workflow_name,
1622        )? {
1623            rendered_sps.insert(step.name.clone(), sp);
1624        }
1625        storage_dirs_map.insert(
1626            step.name.clone(),
1627            storage.add_dirs_for_step(step.storage.as_deref()),
1628        );
1629        active.push(*step);
1630    }
1631
1632    if active.is_empty() {
1633        return Ok(());
1634    }
1635
1636    eprintln!("  running {} steps in parallel...", active.len());
1637
1638    let mut start_times: HashMap<String, Instant> = HashMap::new();
1639    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1640    for step in &active {
1641        let step_clone: Step = (*step).clone();
1642        let prompt = prompts.remove(&step.name).unwrap_or_default();
1643        let rendered_sp = rendered_sps.remove(&step.name);
1644        let workflow_name_owned = workflow_name.to_string();
1645        let name = step.name.clone();
1646        eprintln!("  starting '{name}'...");
1647        if let Some(w) = session {
1648            let zag_session_id = format!("zig-{workflow_name}-{name}");
1649            let _ = w.step_started(
1650                &name,
1651                tier_index,
1652                &zag_session_id,
1653                zag_command_name(&step.command),
1654                step.model.as_deref(),
1655                &prompt_preview(&prompt),
1656            );
1657        }
1658        start_times.insert(name.clone(), Instant::now());
1659        let session_clone = session.cloned();
1660        let wf_provider = workflow_provider.map(String::from);
1661        let wf_model = workflow_model.map(String::from);
1662        let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1663        set.spawn(async move {
1664            let res = run_step_attempts(
1665                &step_clone,
1666                &prompt,
1667                &workflow_name_owned,
1668                Some(&name),
1669                session_clone.as_ref(),
1670                rendered_sp.as_deref(),
1671                wf_provider.as_deref(),
1672                wf_model.as_deref(),
1673                &storage_dirs,
1674            )
1675            .await;
1676            (name, res)
1677        });
1678    }
1679
1680    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1681    while let Some(joined) = set.join_next().await {
1682        match joined {
1683            Ok((name, res)) => {
1684                results.insert(name, res);
1685            }
1686            Err(e) => {
1687                return Err(ZigError::Execution(format!(
1688                    "parallel step task panicked: {e}"
1689                )));
1690            }
1691        }
1692    }
1693
1694    // Process results in tier-declaration order so `next` is deterministic.
1695    let mut errors: Vec<String> = Vec::new();
1696    for step in &active {
1697        let Some(res) = results.remove(&step.name) else {
1698            continue;
1699        };
1700        let elapsed = start_times
1701            .remove(&step.name)
1702            .map(|t| t.elapsed().as_millis() as u64)
1703            .unwrap_or(0);
1704        match res {
1705            Ok(output) => {
1706                let mut saved_keys: Vec<String> = Vec::new();
1707                if !step.saves.is_empty() {
1708                    let saved = extract_saves(&output, &step.saves)?;
1709                    for (k, v) in &saved {
1710                        eprintln!("    saved {k} = {v}");
1711                        saved_keys.push(k.clone());
1712                    }
1713                    vars.extend(saved);
1714                }
1715                step_outputs.insert(step.name.clone(), output);
1716                eprintln!("  completed '{}'", step.name);
1717                if let Some(w) = session {
1718                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1719                }
1720                if step.next.is_some() && pending_next.is_none() {
1721                    *pending_next = step.next.clone();
1722                }
1723            }
1724            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1725                FailurePolicy::Continue => {
1726                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1727                }
1728                FailurePolicy::Fail | FailurePolicy::Retry => {
1729                    errors.push(format!("'{}': {e}", step.name));
1730                }
1731            },
1732        }
1733    }
1734
1735    if !errors.is_empty() {
1736        return Err(ZigError::Execution(format!(
1737            "parallel step(s) failed: {}",
1738            errors.join("; ")
1739        )));
1740    }
1741
1742    Ok(())
1743}
1744
1745/// Initialize the variable map from workflow variable definitions.
1746/// Variables with defaults are set to their default value; others are empty.
1747fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1748    let mut vars = HashMap::new();
1749    for (name, var) in &workflow.vars {
1750        let value = match &var.default {
1751            Some(toml::Value::String(s)) => s.clone(),
1752            Some(toml::Value::Integer(n)) => n.to_string(),
1753            Some(toml::Value::Float(f)) => f.to_string(),
1754            Some(toml::Value::Boolean(b)) => b.to_string(),
1755            Some(other) => other.to_string(),
1756            None => String::new(),
1757        };
1758        vars.insert(name.clone(), value);
1759    }
1760    vars
1761}
1762
1763/// Main execution loop for a validated workflow.
1764#[allow(clippy::too_many_arguments)]
1765async fn execute(
1766    workflow: &Workflow,
1767    workflow_path: &std::path::Path,
1768    user_prompt: Option<&str>,
1769    workflow_dir: &Path,
1770    disable_resources: bool,
1771    disable_memory: bool,
1772    disable_storage: bool,
1773    dry_run: bool,
1774    dry_run_format: DryRunFormat,
1775) -> Result<(), ZigError> {
1776    let mut vars = init_vars(workflow);
1777
1778    let resource_collector = ResourceCollector::from_env(
1779        &workflow.workflow.name,
1780        &workflow.workflow.resources,
1781        workflow_dir,
1782        disable_resources,
1783    );
1784
1785    let config = ZigConfig::load();
1786    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1787    let memory_collector = MemoryCollector::from_env(
1788        &workflow.workflow.name,
1789        workflow_memory_mode,
1790        &config,
1791        disable_memory,
1792    );
1793
1794    // Build storage manager for this run. Paths resolve against <cwd>/.zig/;
1795    // absolute paths pass through. `ensure` is called on every declared item
1796    // so step agents can trust the paths exist before they run.
1797    // When `--no-storage` is passed, skip building entirely so storage dirs
1798    // are not created and the `<storage>` block is omitted from prompts.
1799    // When `--dry-run` is passed, build the manager without `ensure` so the
1800    // block still renders with correct paths but nothing touches disk.
1801    let storage_manager = if disable_storage || workflow.storage.is_empty() {
1802        StorageManager::empty()
1803    } else if dry_run {
1804        let backend = FilesystemBackend::from_cwd()?;
1805        StorageManager::build_dry(&workflow.storage, backend)
1806    } else {
1807        let backend = FilesystemBackend::from_cwd()?;
1808        StorageManager::build(&workflow.storage, backend)?
1809    };
1810
1811    // Load file-backed variable defaults before prompt binding.
1812    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1813
1814    // Bind user prompt to the variable with `from = "prompt"`, if any.
1815    let prompt_var = workflow
1816        .vars
1817        .iter()
1818        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1819        .map(|(name, _)| name.clone());
1820
1821    if let Some(ref var_name) = prompt_var {
1822        if let Some(prompt) = user_prompt {
1823            vars.insert(var_name.clone(), prompt.to_string());
1824        }
1825    }
1826
1827    // Validate variable values against constraints before executing.
1828    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1829        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1830        return Err(ZigError::Validation(msgs.join("; ")));
1831    }
1832
1833    // When prompt is bound to a variable, don't also prepend "User context:".
1834    let effective_user_prompt = if prompt_var.is_some() {
1835        None
1836    } else {
1837        user_prompt
1838    };
1839
1840    let mut step_outputs: HashMap<String, String> = HashMap::new();
1841
1842    let wf_provider = workflow.workflow.provider.as_deref();
1843    let wf_model = workflow.workflow.model.as_deref();
1844
1845    let tiers = topological_sort(&workflow.steps)?;
1846
1847    if dry_run {
1848        let ctx = DryRunContext {
1849            workflow,
1850            workflow_path,
1851            workflow_dir,
1852            vars: &vars,
1853            user_prompt: effective_user_prompt,
1854            roles: &workflow.roles,
1855            resources: &resource_collector,
1856            memory: &memory_collector,
1857            storage: &storage_manager,
1858            wf_provider,
1859            wf_model,
1860            disable_resources,
1861            disable_memory,
1862            disable_storage,
1863        };
1864        return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1865    }
1866
1867    eprintln!(
1868        "running workflow '{}' ({} steps in {} tiers)",
1869        workflow.workflow.name,
1870        workflow.steps.len(),
1871        tiers.len()
1872    );
1873
1874    // Open a zig session log for this run. Failure to open the log is
1875    // non-fatal — `zig run` should still execute even if `~/.zig` is
1876    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1877    // downstream so the writer-less path stays intact for tests.
1878    let coordinator = match SessionWriter::create(
1879        &workflow.workflow.name,
1880        &workflow_path.to_string_lossy(),
1881        user_prompt,
1882        tiers.len(),
1883    ) {
1884        Ok(writer) => {
1885            eprintln!("zig session: {}", writer.session_id());
1886            Some(SessionCoordinator::start(writer))
1887        }
1888        Err(e) => {
1889            eprintln!("warning: failed to open zig session log: {e}");
1890            None
1891        }
1892    };
1893    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1894    let session_ref = session_writer.as_ref();
1895
1896    let mut iteration = 0;
1897    let mut pending_next: Option<String> = None;
1898
1899    loop {
1900        let tiers_to_run = if let Some(ref next_step) = pending_next {
1901            // Re-run from the target step onward
1902            let remaining: Vec<Vec<&Step>> = tiers
1903                .iter()
1904                .map(|tier| {
1905                    tier.iter()
1906                        .filter(|s| s.name == *next_step)
1907                        .copied()
1908                        .collect::<Vec<_>>()
1909                })
1910                .filter(|tier| !tier.is_empty())
1911                .collect();
1912            pending_next = None;
1913            remaining
1914        } else if iteration == 0 {
1915            tiers.clone()
1916        } else {
1917            break;
1918        };
1919
1920        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1921            let (non_race, race_groups) = partition_tier(tier);
1922
1923            if let Some(w) = session_ref {
1924                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1925                let _ = w.tier_started(tier_index, names);
1926            }
1927
1928            // Independent steps in the same tier run concurrently. A single
1929            // step takes the sequential path so its output streams without
1930            // a name prefix; multiple steps go through the parallel path.
1931            if non_race.len() <= 1 {
1932                for step in &non_race {
1933                    execute_sequential_step(
1934                        step,
1935                        &mut vars,
1936                        effective_user_prompt,
1937                        &mut step_outputs,
1938                        &workflow.workflow.name,
1939                        &mut pending_next,
1940                        tier_index,
1941                        session_ref,
1942                        &workflow.roles,
1943                        &resource_collector,
1944                        &memory_collector,
1945                        &storage_manager,
1946                        workflow_dir,
1947                        wf_provider,
1948                        wf_model,
1949                    )
1950                    .await?;
1951                }
1952            } else {
1953                execute_parallel_tier(
1954                    &non_race,
1955                    &mut vars,
1956                    effective_user_prompt,
1957                    &mut step_outputs,
1958                    &workflow.workflow.name,
1959                    &mut pending_next,
1960                    tier_index,
1961                    session_ref,
1962                    &workflow.roles,
1963                    &resource_collector,
1964                    &memory_collector,
1965                    &storage_manager,
1966                    workflow_dir,
1967                    wf_provider,
1968                    wf_model,
1969                )
1970                .await?;
1971            }
1972
1973            // Run race groups in parallel
1974            for (group_name, race_steps) in &race_groups {
1975                eprintln!("  starting race group '{group_name}'...");
1976
1977                // Build prompts for all racers (conditions evaluated here)
1978                let mut prompts = HashMap::new();
1979                let mut race_sps: HashMap<String, String> = HashMap::new();
1980                let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1981                    HashMap::new();
1982                let mut active_steps: Vec<&Step> = Vec::new();
1983                for step in race_steps {
1984                    if let Some(condition) = &step.condition {
1985                        if !evaluate_condition(condition, &vars)? {
1986                            eprintln!(
1987                                "  skipping '{}' (condition not met: {condition})",
1988                                step.name
1989                            );
1990                            continue;
1991                        }
1992                    }
1993                    let prompt =
1994                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1995                    prompts.insert(step.name.clone(), prompt);
1996                    if let Some(sp) = resolve_role_system_prompt(
1997                        step,
1998                        &workflow.roles,
1999                        &resource_collector,
2000                        &memory_collector,
2001                        &storage_manager,
2002                        &vars,
2003                        workflow_dir,
2004                        &workflow.workflow.name,
2005                    )? {
2006                        race_sps.insert(step.name.clone(), sp);
2007                    }
2008                    race_storage_dirs.insert(
2009                        step.name.clone(),
2010                        storage_manager.add_dirs_for_step(step.storage.as_deref()),
2011                    );
2012                    active_steps.push(step);
2013                }
2014
2015                if active_steps.is_empty() {
2016                    continue;
2017                }
2018
2019                match execute_race_group(
2020                    &active_steps,
2021                    &prompts,
2022                    &race_sps,
2023                    &workflow.workflow.name,
2024                    tier_index,
2025                    session_ref,
2026                    wf_provider,
2027                    wf_model,
2028                    &race_storage_dirs,
2029                )
2030                .await
2031                {
2032                    Ok((winner_name, output)) => {
2033                        // Find the winning step to process saves/next
2034                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
2035                            if !winner.saves.is_empty() {
2036                                let saved = extract_saves(&output, &winner.saves)?;
2037                                for (k, v) in &saved {
2038                                    eprintln!("    saved {k} = {v}");
2039                                }
2040                                vars.extend(saved);
2041                            }
2042                            if winner.next.is_some() {
2043                                pending_next = winner.next.clone();
2044                            }
2045                        }
2046                        step_outputs.insert(winner_name.clone(), output);
2047                        eprintln!(
2048                            "  completed race group '{group_name}' (winner: '{winner_name}')"
2049                        );
2050                    }
2051                    Err(e) => return Err(e),
2052                }
2053            }
2054        }
2055
2056        iteration += 1;
2057        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2058            if iteration >= MAX_LOOP_ITERATIONS {
2059                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2060            }
2061            break;
2062        }
2063    }
2064
2065    eprintln!("workflow '{}' completed", workflow.workflow.name);
2066    if let Some(c) = coordinator {
2067        let _ = c.finish(SessionStatus::Success);
2068    }
2069    Ok(())
2070}
2071
2072/// Short label for the zag subcommand a step will invoke. Used in
2073/// `step_started` events so listeners can distinguish run/review/plan/etc.
2074fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2075    match cmd {
2076        None => "run",
2077        Some(StepCommand::Review) => "review",
2078        Some(StepCommand::Plan) => "plan",
2079        Some(StepCommand::Pipe) => "pipe",
2080        Some(StepCommand::Collect) => "collect",
2081        Some(StepCommand::Summary) => "summary",
2082    }
2083}
2084
2085/// Truncated single-line preview of a rendered prompt for the session log.
2086fn prompt_preview(prompt: &str) -> String {
2087    const MAX: usize = 200;
2088    let collapsed: String = prompt
2089        .chars()
2090        .map(|c| if c == '\n' { ' ' } else { c })
2091        .collect();
2092    if collapsed.chars().count() <= MAX {
2093        collapsed
2094    } else {
2095        let truncated: String = collapsed.chars().take(MAX).collect();
2096        format!("{truncated}…")
2097    }
2098}
2099
2100#[cfg(test)]
2101#[path = "run_tests.rs"]
2102mod tests;