Skip to main content

zig_core/
run.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use serde::Serialize;
7use tokio::task::JoinSet;
8use zag_agent::builder::AgentBuilder;
9use zag_agent::session_log::LogEventKind;
10use zag_agent::{plan as agent_plan, review as agent_review};
11use zag_orch::collect as orch_collect;
12use zag_orch::summary as orch_summary;
13
14use crate::config::ZigConfig;
15use crate::dry_run::{DryRunContext, DryRunFormat};
16use crate::error::ZigError;
17use crate::memory::{MemoryCollector, render_memory_block};
18use crate::paths::expand_path;
19use crate::resources::{ResourceCollector, render_system_block};
20use crate::session::{OutputStream, SessionCoordinator, SessionStatus, SessionWriter};
21use crate::storage::{FilesystemBackend, StorageManager};
22use crate::workflow::model::{FailurePolicy, MemoryMode, Role, Step, StepCommand, Workflow};
23use crate::workflow::{parser, validate};
24
25/// Maximum number of loop iterations to prevent infinite loops from `next` fields.
26const MAX_LOOP_ITERATIONS: usize = 100;
27
28/// Execute a workflow file (`.zwf` or `.zwfz`).
29///
30/// Parses the workflow, validates it, resolves the step DAG, and executes
31/// each step via the embedded [`zag_agent::builder::AgentBuilder`] (and
32/// [`zag_orch`] for pipe/collect/summary commands). The optional
33/// `user_prompt` is injected as additional context into every step's prompt.
34///
35/// Resource advertisement (the `<resources>` block prepended to each step's
36/// system prompt) is enabled by default; pass `disable_resources = true` to
37/// opt out, e.g. via `zig run --no-resources`.
38///
39/// Memory injection (the `<memory>` block) is similarly enabled by default;
40/// pass `disable_memory = true` to opt out via `zig run --no-memory`.
41///
42/// Storage injection (the `<storage>` block) is similarly enabled by default;
43/// pass `disable_storage = true` to opt out via `zig run --no-storage`.
44///
45/// When `dry_run = true`, the workflow is parsed, validated, and its plan is
46/// printed in the requested `format` — no agent invocation, session log,
47/// storage creation, or memory write occurs. The three `disable_*` flags are
48/// respected and surface in the plan output.
49#[allow(clippy::too_many_arguments)]
50pub async fn run_workflow(
51    workflow_path: &str,
52    user_prompt: Option<&str>,
53    disable_resources: bool,
54    disable_memory: bool,
55    disable_storage: bool,
56    dry_run: bool,
57    dry_run_format: DryRunFormat,
58) -> Result<(), ZigError> {
59    let path = resolve_workflow_path(workflow_path)?;
60    let (workflow, source) = parser::parse_workflow(&path)?;
61
62    if let Err(errors) = validate::validate(&workflow) {
63        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
64        return Err(ZigError::Validation(msgs.join("; ")));
65    }
66
67    execute(
68        &workflow,
69        &path,
70        user_prompt,
71        source.dir(),
72        disable_resources,
73        disable_memory,
74        disable_storage,
75        dry_run,
76        dry_run_format,
77    )
78    .await
79}
80
81/// Resolve a workflow argument to an actual file path.
82///
83/// Tries in order:
84/// 1. Literal path as given
85/// 2. With `.zwf` extension appended
86/// 3. With `.zwfz` extension appended
87/// 4. Under local project `.zig/workflows/` directory
88/// 5. Under local project `.zig/workflows/` with `.zwf` appended
89/// 6. Under local project `.zig/workflows/` with `.zwfz` appended
90/// 7. Under global `~/.zig/workflows/` directory
91/// 8. Under global `~/.zig/workflows/` with `.zwf` appended
92/// 9. Under global `~/.zig/workflows/` with `.zwfz` appended
93pub fn resolve_workflow_path(workflow: &str) -> Result<PathBuf, ZigError> {
94    let mut candidates = vec![
95        PathBuf::from(workflow),
96        PathBuf::from(format!("{workflow}.zwf")),
97        PathBuf::from(format!("{workflow}.zwfz")),
98    ];
99
100    if let Some(local_dir) = crate::paths::cwd_workflows_dir() {
101        candidates.push(local_dir.join(workflow));
102        candidates.push(local_dir.join(format!("{workflow}.zwf")));
103        candidates.push(local_dir.join(format!("{workflow}.zwfz")));
104    }
105
106    if let Some(global_dir) = crate::paths::global_workflows_dir() {
107        candidates.push(global_dir.join(workflow));
108        candidates.push(global_dir.join(format!("{workflow}.zwf")));
109        candidates.push(global_dir.join(format!("{workflow}.zwfz")));
110    }
111
112    for candidate in &candidates {
113        if candidate.exists() {
114            return Ok(candidate.clone());
115        }
116    }
117
118    Err(ZigError::Io(format!(
119        "workflow not found: '{workflow}' (tried: {})",
120        candidates
121            .iter()
122            .map(|p| p.display().to_string())
123            .collect::<Vec<_>>()
124            .join(", ")
125    )))
126}
127
128/// Compute a topological ordering of steps grouped into tiers.
129///
130/// Each tier contains steps whose dependencies are all in earlier tiers,
131/// meaning steps within a tier can (in principle) run in parallel.
132/// Uses Kahn's algorithm.
133pub(crate) fn topological_sort(steps: &[Step]) -> Result<Vec<Vec<&Step>>, ZigError> {
134    let step_index: HashMap<&str, usize> = steps
135        .iter()
136        .enumerate()
137        .map(|(i, s)| (s.name.as_str(), i))
138        .collect();
139
140    let mut in_degree = vec![0usize; steps.len()];
141    for (i, step) in steps.iter().enumerate() {
142        for dep in &step.depends_on {
143            if step_index.contains_key(dep.as_str()) {
144                in_degree[i] += 1;
145            }
146        }
147    }
148
149    let mut tiers = Vec::new();
150    let mut remaining = in_degree.clone();
151    let mut completed: Vec<bool> = vec![false; steps.len()];
152
153    loop {
154        let tier: Vec<usize> = (0..steps.len())
155            .filter(|&i| !completed[i] && remaining[i] == 0)
156            .collect();
157
158        if tier.is_empty() {
159            break;
160        }
161
162        for &i in &tier {
163            completed[i] = true;
164        }
165
166        // Decrement in-degrees for dependents of this tier
167        for &i in &tier {
168            for (j, step) in steps.iter().enumerate() {
169                if !completed[j] && step.depends_on.contains(&steps[i].name) {
170                    remaining[j] -= 1;
171                }
172            }
173        }
174
175        tiers.push(tier.iter().map(|&i| &steps[i]).collect());
176    }
177
178    let completed_count: usize = completed.iter().filter(|&&c| c).count();
179    if completed_count != steps.len() {
180        return Err(ZigError::Execution(
181            "could not resolve all steps — possible undetected cycle".into(),
182        ));
183    }
184
185    Ok(tiers)
186}
187
188/// Replace `${var_name}` references in a template with values from the variable map.
189///
190/// Supports dotted paths like `${result.score}` — the root variable name is
191/// looked up, and if its value is valid JSON, the nested path is traversed.
192/// Unknown variables are left as-is.
193pub(crate) fn substitute_vars(template: &str, vars: &HashMap<String, String>) -> String {
194    let mut result = String::with_capacity(template.len());
195    let mut rest = template;
196
197    while let Some(start) = rest.find("${") {
198        result.push_str(&rest[..start]);
199        let after_start = &rest[start + 2..];
200
201        if let Some(end) = after_start.find('}') {
202            let var_expr = &after_start[..end];
203            let mut parts = var_expr.splitn(2, '.');
204            let root = parts.next().unwrap_or(var_expr);
205
206            if let Some(value) = vars.get(root) {
207                if let Some(path) = parts.next() {
208                    // Try to navigate a JSON path
209                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(value) {
210                        let resolved = json_path_lookup(&json, path);
211                        result.push_str(&resolved);
212                    } else {
213                        result.push_str(value);
214                    }
215                } else {
216                    result.push_str(value);
217                }
218            } else {
219                // Unknown variable — leave as-is
220                result.push_str(&rest[start..start + 2 + end + 1]);
221            }
222
223            rest = &after_start[end + 1..];
224        } else {
225            result.push_str(&rest[start..]);
226            rest = "";
227        }
228    }
229
230    result.push_str(rest);
231    result
232}
233
234/// Look up a dotted path in a JSON value (e.g., "nested.field").
235fn json_path_lookup(value: &serde_json::Value, path: &str) -> String {
236    let mut current = value;
237    for key in path.split('.') {
238        match current.get(key) {
239            Some(v) => current = v,
240            None => return format!("${{?.{path}}}"),
241        }
242    }
243    match current {
244        serde_json::Value::String(s) => s.clone(),
245        other => other.to_string(),
246    }
247}
248
249/// Resolve the effective system prompt for a step, with any advertised
250/// resources prepended as a `<resources>` block.
251///
252/// Resolution order:
253/// 1. If `step.system_prompt` is set, use it (with variable substitution).
254/// 2. If `step.role` is set, resolve the role name (may contain `${var}`),
255///    look it up in the roles table, and use the role's system prompt
256///    (loaded from file if `system_prompt_file` is set).
257/// 3. Otherwise the base prompt is empty.
258///
259/// Resources from all configured tiers (global shared, global per-workflow,
260/// project cwd, inline workflow, inline step) are then collected by the
261/// supplied [`ResourceCollector`] and rendered into a `<resources>` block
262/// that is prepended to the base prompt. If both the resources set and the
263/// base prompt are empty, returns `None` — keeping the current behavior when
264/// nothing is configured.
265#[allow(clippy::too_many_arguments)]
266pub(crate) fn resolve_role_system_prompt(
267    step: &Step,
268    roles: &HashMap<String, Role>,
269    resources: &ResourceCollector<'_>,
270    memory: &MemoryCollector,
271    storage: &StorageManager,
272    vars: &HashMap<String, String>,
273    workflow_dir: &Path,
274    workflow_name: &str,
275) -> Result<Option<String>, ZigError> {
276    // Resolve the base system prompt (may be empty if neither is set).
277    let base_prompt: Option<String> = if let Some(ref sp) = step.system_prompt {
278        Some(substitute_vars(sp, vars))
279    } else if let Some(ref role_ref) = step.role {
280        let resolved_name = substitute_vars(role_ref, vars);
281        let role = roles.get(&resolved_name).ok_or_else(|| {
282            ZigError::Execution(format!(
283                "step '{}' references role '{}' which does not exist",
284                step.name, resolved_name
285            ))
286        })?;
287
288        let raw_prompt = if let Some(ref file_path) = role.system_prompt_file {
289            let full_path = workflow_dir.join(expand_path(file_path));
290            Some(std::fs::read_to_string(&full_path).map_err(|e| {
291                ZigError::Execution(format!(
292                    "failed to read system_prompt_file '{}' for role '{}': {e}",
293                    full_path.display(),
294                    resolved_name
295                ))
296            })?)
297        } else {
298            role.system_prompt.clone()
299        };
300
301        raw_prompt.map(|p| substitute_vars(&p, vars))
302    } else {
303        None
304    };
305
306    // Collect and render resources.
307    let set = resources.collect_for_step(&step.resources)?;
308    let resource_block = render_system_block(&set);
309
310    // Collect and render memory.
311    let memory_entries = memory.collect_for_step(step.memory.as_deref())?;
312    let memory_block = render_memory_block(&memory_entries, workflow_name, Some(&step.name));
313
314    // Render storage block for this step's scope.
315    let storage_block = match storage.render_block(step.storage.as_deref())? {
316        Some(mut s) => {
317            s.push('\n');
318            s
319        }
320        None => String::new(),
321    };
322
323    let prefix = format!("{resource_block}{memory_block}{storage_block}");
324
325    match (prefix.is_empty(), base_prompt) {
326        (true, None) => Ok(None),
327        (true, Some(p)) => Ok(Some(p)),
328        (false, None) => Ok(Some(prefix.trim_end().to_string())),
329        (false, Some(p)) => Ok(Some(format!("{prefix}{p}"))),
330    }
331}
332
333/// Load file-backed default values for variables.
334///
335/// For each variable with `default_file` set (and no `default`), reads the
336/// file contents relative to `workflow_dir` and inserts them into the vars map.
337fn load_file_defaults(
338    vars: &mut HashMap<String, String>,
339    declarations: &HashMap<String, crate::workflow::model::Variable>,
340    workflow_dir: &Path,
341) -> Result<(), ZigError> {
342    for (name, decl) in declarations {
343        if decl.default.is_none() {
344            if let Some(ref file_path) = decl.default_file {
345                let full_path = workflow_dir.join(expand_path(file_path));
346                let content = std::fs::read_to_string(&full_path).map_err(|e| {
347                    ZigError::Execution(format!(
348                        "failed to read default_file '{}' for variable '{name}': {e}",
349                        full_path.display()
350                    ))
351                })?;
352                vars.insert(name.clone(), content);
353            }
354        }
355    }
356    Ok(())
357}
358
359/// Evaluate a simple condition expression against the current variable state.
360///
361/// Supports:
362/// - Numeric comparisons: `score < 8`, `retries <= max_retries`
363/// - String equality: `status == "done"`, `status != "pending"`
364/// - Truthy checks: `approved` (true if value is "true" or non-empty and non-zero)
365pub(crate) fn evaluate_condition(
366    condition: &str,
367    vars: &HashMap<String, String>,
368) -> Result<bool, ZigError> {
369    let condition = condition.trim();
370
371    // Try comparison operators (ordered by length to match `<=` before `<`)
372    let operators = ["<=", ">=", "!=", "==", "<", ">"];
373    for op in &operators {
374        if let Some(pos) = condition.find(op) {
375            let lhs = resolve_operand(condition[..pos].trim(), vars);
376            let rhs = resolve_operand(condition[pos + op.len()..].trim(), vars);
377            return Ok(compare(&lhs, &rhs, op));
378        }
379    }
380
381    // Truthy check: single variable name
382    let value = vars.get(condition).map(|s| s.as_str()).unwrap_or("");
383    Ok(is_truthy(value))
384}
385
386/// Resolve a condition operand to its string value.
387/// - String literals ("done") → done
388/// - Variable names → looked up in vars
389/// - Numeric literals → left as-is
390fn resolve_operand(token: &str, vars: &HashMap<String, String>) -> String {
391    // Strip surrounding quotes for string literals
392    if (token.starts_with('"') && token.ends_with('"'))
393        || (token.starts_with('\'') && token.ends_with('\''))
394    {
395        return token[1..token.len() - 1].to_string();
396    }
397    // Try variable lookup
398    if let Some(val) = vars.get(token) {
399        return val.clone();
400    }
401    // Return as-is (numeric literal or unknown)
402    token.to_string()
403}
404
405/// Compare two string operands with the given operator.
406/// Attempts numeric comparison first, falls back to lexicographic.
407fn compare(lhs: &str, rhs: &str, op: &str) -> bool {
408    if let (Ok(l), Ok(r)) = (lhs.parse::<f64>(), rhs.parse::<f64>()) {
409        return match op {
410            "==" => (l - r).abs() < f64::EPSILON,
411            "!=" => (l - r).abs() >= f64::EPSILON,
412            "<" => l < r,
413            ">" => l > r,
414            "<=" => l <= r,
415            ">=" => l >= r,
416            _ => false,
417        };
418    }
419    match op {
420        "==" => lhs == rhs,
421        "!=" => lhs != rhs,
422        "<" => lhs < rhs,
423        ">" => lhs > rhs,
424        "<=" => lhs <= rhs,
425        ">=" => lhs >= rhs,
426        _ => false,
427    }
428}
429
430/// Check if a string value is truthy.
431fn is_truthy(value: &str) -> bool {
432    !value.is_empty() && value != "false" && value != "0"
433}
434
435/// Build the final prompt for a step, incorporating variable substitution,
436/// dependency outputs, and the user's context prompt.
437pub(crate) fn render_step_prompt(
438    step: &Step,
439    vars: &HashMap<String, String>,
440    user_prompt: Option<&str>,
441    dependency_outputs: &HashMap<String, String>,
442) -> String {
443    let mut prompt = String::new();
444
445    // Prepend user context if provided
446    if let Some(ctx) = user_prompt {
447        prompt.push_str(&format!("User context: {ctx}\n\n"));
448    }
449
450    // Inject dependency outputs if requested
451    if step.inject_context {
452        for dep in &step.depends_on {
453            if let Some(output) = dependency_outputs.get(dep) {
454                prompt.push_str(&format!("--- Output from '{dep}' ---\n{output}\n\n"));
455            }
456        }
457    }
458
459    // Append the step's prompt with variable substitution
460    prompt.push_str(&substitute_vars(&step.prompt, vars));
461
462    prompt
463}
464
465/// Serializable snapshot of everything a step contributes to an agent
466/// invocation. Produced by [`build_agent_config`] and used by both the
467/// executor (to configure a [`AgentBuilder`]) and `--dry-run` (to show
468/// the author what each step will ask of the agent).
469///
470/// The shape mirrors [`AgentBuilder`] field-for-field; additional
471/// command-specific parameters (review/plan/pipe/collect/summary) are
472/// stored in the optional `command_params` bag.
473#[derive(Debug, Clone, Serialize, Default)]
474pub struct AgentConfig {
475    /// Subcommand label — `"run"`, `"review"`, `"plan"`, `"pipe"`,
476    /// `"collect"`, or `"summary"`.
477    pub command: String,
478
479    /// Agent-level knobs.
480    pub provider: Option<String>,
481    pub model: Option<String>,
482    pub system_prompt: Option<String>,
483    pub root: Option<String>,
484    pub add_dirs: Vec<String>,
485    #[serde(serialize_with = "serialize_env_pairs")]
486    pub env: Vec<(String, String)>,
487    pub files: Vec<String>,
488    pub auto_approve: bool,
489    /// `None` → no worktree, `Some(None)` → generated name, `Some(Some(n))` → explicit.
490    #[serde(skip_serializing_if = "Option::is_none")]
491    pub worktree: Option<Option<String>>,
492    #[serde(skip_serializing_if = "Option::is_none")]
493    pub sandbox: Option<String>,
494
495    /// Output shaping.
496    pub json_mode: bool,
497    #[serde(skip_serializing_if = "Option::is_none")]
498    pub json_schema: Option<String>,
499    #[serde(skip_serializing_if = "Option::is_none")]
500    pub output_format: Option<String>,
501
502    /// Turn / timeout / MCP.
503    #[serde(skip_serializing_if = "Option::is_none")]
504    pub max_turns: Option<u32>,
505    #[serde(skip_serializing_if = "Option::is_none")]
506    pub timeout: Option<String>,
507    #[serde(skip_serializing_if = "Option::is_none")]
508    pub mcp_config: Option<String>,
509
510    /// Session metadata — always set (session name derives from workflow/step).
511    pub session_name: String,
512    #[serde(skip_serializing_if = "Option::is_none")]
513    pub description: Option<String>,
514    pub tags: Vec<String>,
515
516    /// The effective prompt after dependency/context/plan prepending.
517    pub prompt: String,
518
519    /// Whether `accepts_agent_args` was true for this command — pipe/run/
520    /// review/plan/exec respect agent-level flags; collect/summary don't.
521    pub accepts_agent_args: bool,
522
523    /// Extra params for the non-plain commands. `None` for `run`.
524    #[serde(skip_serializing_if = "Option::is_none")]
525    pub command_params: Option<CommandParams>,
526
527    /// Interactive flag — steps with `interactive = true` run through
528    /// `AgentBuilder::run` instead of `exec`.
529    pub interactive: bool,
530}
531
532fn serialize_env_pairs<S>(pairs: &[(String, String)], s: S) -> Result<S::Ok, S::Error>
533where
534    S: serde::Serializer,
535{
536    use serde::ser::SerializeSeq;
537    let mut seq = s.serialize_seq(Some(pairs.len()))?;
538    for (k, v) in pairs {
539        seq.serialize_element(&format!("{k}={v}"))?;
540    }
541    seq.end()
542}
543
544/// Command-specific parameter bag. Only populated for non-`run` commands.
545#[derive(Debug, Clone, Serialize)]
546#[serde(tag = "kind", rename_all = "snake_case")]
547pub enum CommandParams {
548    Review {
549        uncommitted: bool,
550        base: Option<String>,
551        commit: Option<String>,
552        title: Option<String>,
553    },
554    Plan {
555        output: Option<String>,
556        instructions: Option<String>,
557    },
558    Pipe {
559        session_ids: Vec<String>,
560    },
561    Collect {
562        session_ids: Vec<String>,
563    },
564    Summary {
565        session_ids: Vec<String>,
566    },
567}
568
569/// Build an [`AgentConfig`] snapshot for a step. Replaces the old argv
570/// builder (`build_zag_args`) — the returned config is applied to an
571/// [`AgentBuilder`] at execution time by [`apply_agent_config`].
572#[allow(clippy::too_many_arguments)]
573pub(crate) fn build_agent_config(
574    step: &Step,
575    prompt: &str,
576    workflow_name: &str,
577    model_override: Option<&str>,
578    rendered_system_prompt: Option<&str>,
579    workflow_provider: Option<&str>,
580    workflow_model: Option<&str>,
581    extra_add_dirs: &[std::path::PathBuf],
582) -> AgentConfig {
583    let session_name = |dep: &str| format!("zig-{workflow_name}-{dep}");
584
585    let (command_label, accepts_agent_args, command_params) = match &step.command {
586        None => ("run".to_string(), true, None),
587        Some(StepCommand::Review) => (
588            "review".to_string(),
589            true,
590            Some(CommandParams::Review {
591                uncommitted: step.uncommitted,
592                base: step.base.clone(),
593                commit: step.commit.clone(),
594                title: step.title.clone(),
595            }),
596        ),
597        Some(StepCommand::Plan) => (
598            "plan".to_string(),
599            true,
600            Some(CommandParams::Plan {
601                output: step.plan_output.as_deref().map(expand_path),
602                instructions: step.instructions.clone(),
603            }),
604        ),
605        Some(StepCommand::Pipe) => {
606            let session_ids: Vec<String> =
607                step.depends_on.iter().map(|d| session_name(d)).collect();
608            (
609                "pipe".to_string(),
610                true,
611                Some(CommandParams::Pipe { session_ids }),
612            )
613        }
614        Some(StepCommand::Collect) => {
615            let session_ids: Vec<String> =
616                step.depends_on.iter().map(|d| session_name(d)).collect();
617            (
618                "collect".to_string(),
619                false,
620                Some(CommandParams::Collect { session_ids }),
621            )
622        }
623        Some(StepCommand::Summary) => {
624            let session_ids: Vec<String> =
625                step.depends_on.iter().map(|d| session_name(d)).collect();
626            (
627                "summary".to_string(),
628                false,
629                Some(CommandParams::Summary { session_ids }),
630            )
631        }
632    };
633
634    // Build the agent config. Agent-level knobs only apply when the
635    // command accepts them (matching the old `accepts_agent_args` gate).
636    let mut cfg = AgentConfig {
637        command: command_label,
638        session_name: session_name(&step.name),
639        description: if step.description.is_empty() {
640            None
641        } else {
642            Some(step.description.clone())
643        },
644        tags: {
645            let mut t = vec!["zig-workflow".to_string()];
646            t.extend(step.tags.iter().cloned());
647            t
648        },
649        timeout: step.timeout.clone(),
650        prompt: prompt.to_string(),
651        accepts_agent_args,
652        command_params,
653        interactive: step.interactive,
654        ..Default::default()
655    };
656
657    if !accepts_agent_args {
658        return cfg;
659    }
660
661    cfg.provider = step
662        .provider
663        .clone()
664        .or_else(|| workflow_provider.map(String::from));
665    cfg.model = model_override
666        .map(String::from)
667        .or_else(|| step.model.clone())
668        .or_else(|| workflow_model.map(String::from));
669    cfg.system_prompt = rendered_system_prompt.map(String::from);
670    cfg.max_turns = step.max_turns;
671
672    // Output format: explicit `output` overrides the `json` bool; the
673    // two map onto AgentBuilder::output_format and AgentBuilder::json.
674    if let Some(output) = &step.output {
675        cfg.output_format = Some(output.clone());
676    } else if step.json {
677        cfg.json_mode = true;
678    }
679    cfg.json_schema = step.json_schema.clone();
680    cfg.mcp_config = step.mcp_config.as_deref().map(expand_path);
681
682    cfg.auto_approve = step.auto_approve;
683    cfg.root = step.root.as_deref().map(expand_path);
684    cfg.add_dirs = step
685        .add_dirs
686        .iter()
687        .map(|d| expand_path(d))
688        .chain(extra_add_dirs.iter().map(|p| p.display().to_string()))
689        .collect();
690    cfg.env = step
691        .env
692        .iter()
693        .map(|(k, v)| (k.clone(), v.clone()))
694        .collect();
695    cfg.files = step.files.iter().map(|f| expand_path(f)).collect();
696
697    // Isolation
698    if step.worktree {
699        cfg.worktree = Some(None);
700    }
701    cfg.sandbox = step.sandbox.clone();
702
703    // Interactive agents get a self-terminate instruction appended so
704    // the session exits when the agent is done instead of waiting on
705    // the user to close it.
706    if cfg.interactive {
707        let base = cfg.system_prompt.take().unwrap_or_default();
708        cfg.system_prompt = Some(format!(
709            "{base}{}",
710            crate::self_cmd::INTERACTIVE_SELF_TERMINATE_INSTRUCTION
711        ));
712    }
713
714    cfg
715}
716
717/// Apply an [`AgentConfig`] to an [`AgentBuilder`]. The prompt itself
718/// is NOT set here — callers pass it to `exec`/`run` directly.
719pub(crate) fn apply_agent_config(mut builder: AgentBuilder, cfg: &AgentConfig) -> AgentBuilder {
720    if let Some(ref p) = cfg.provider {
721        builder = builder.provider(p);
722    }
723    if let Some(ref m) = cfg.model {
724        builder = builder.model(m);
725    }
726    if let Some(ref sp) = cfg.system_prompt {
727        builder = builder.system_prompt(sp);
728    }
729    if let Some(ref r) = cfg.root {
730        builder = builder.root(r);
731    }
732    if cfg.auto_approve {
733        builder = builder.auto_approve(true);
734    }
735    for dir in &cfg.add_dirs {
736        builder = builder.add_dir(dir);
737    }
738    for (k, v) in &cfg.env {
739        builder = builder.env(k, v);
740    }
741    for f in &cfg.files {
742        builder = builder.file(f);
743    }
744    if let Some(ref wt) = cfg.worktree {
745        builder = builder.worktree(wt.as_deref());
746    }
747    if let Some(ref sb) = cfg.sandbox {
748        builder = builder.sandbox(Some(sb));
749    }
750    if let Some(ref fmt) = cfg.output_format {
751        builder = builder.output_format(fmt);
752    }
753    if cfg.json_mode {
754        if let Some(ref schema) = cfg.json_schema {
755            if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
756                builder = builder.json_schema(v);
757            } else {
758                builder = builder.json();
759            }
760        } else {
761            builder = builder.json();
762        }
763    } else if let Some(ref schema) = cfg.json_schema {
764        if let Ok(v) = serde_json::from_str::<serde_json::Value>(schema) {
765            builder = builder.json_schema(v);
766        }
767    }
768    if let Some(turns) = cfg.max_turns {
769        builder = builder.max_turns(turns);
770    }
771    if let Some(ref t) = cfg.timeout {
772        if let Some(dur) = parse_timeout_string(t) {
773            builder = builder.timeout(dur);
774        }
775    }
776    if let Some(ref mcp) = cfg.mcp_config {
777        builder = builder.mcp_config(mcp);
778    }
779    builder = builder.name(&cfg.session_name);
780    if let Some(ref d) = cfg.description {
781        builder = builder.description(d);
782    }
783    for tag in &cfg.tags {
784        builder = builder.tag(tag);
785    }
786    builder
787}
788
789/// Parse a duration string like `"5m"`, `"30s"`, `"1h30m"` into a
790/// [`std::time::Duration`]. Returns `None` if the format is unrecognised.
791fn parse_timeout_string(s: &str) -> Option<Duration> {
792    // Mirrors zag_orch::duration::parse_duration — allows `1h30m`, `5m`,
793    // `30s`, `500ms`, bare seconds (`60`). Keep the dependency inline so
794    // we don't have to thread its error type through ZigError.
795    let s = s.trim();
796    if s.is_empty() {
797        return None;
798    }
799    if let Ok(secs) = s.parse::<u64>() {
800        return Some(Duration::from_secs(secs));
801    }
802    let mut total = Duration::ZERO;
803    let mut current = String::new();
804    let mut chars = s.chars().peekable();
805    while let Some(c) = chars.next() {
806        if c.is_ascii_digit() || c == '.' {
807            current.push(c);
808            continue;
809        }
810        let mut unit = String::from(c);
811        if c == 'm' && chars.peek() == Some(&'s') {
812            unit.push(chars.next().unwrap());
813        }
814        let value: f64 = current.parse().ok()?;
815        current.clear();
816        let piece = match unit.as_str() {
817            "ms" => Duration::from_millis(value as u64),
818            "s" => Duration::from_secs_f64(value),
819            "m" => Duration::from_secs_f64(value * 60.0),
820            "h" => Duration::from_secs_f64(value * 3600.0),
821            _ => return None,
822        };
823        total += piece;
824    }
825    if !current.is_empty() {
826        return None;
827    }
828    Some(total)
829}
830
831/// Dispatch a built [`AgentConfig`] through the appropriate `zag-agent`
832/// / `zag-orch` entry point and return the captured result text used for
833/// `saves` and dependency injection.
834///
835/// Every path that drives an agent attaches a live `.on_log_event` hook
836/// via [`install_live_streaming`] so per-turn activity (assistant
837/// messages, tool calls, reasoning) streams to stderr and the zig session
838/// log as it happens — restoring the old `zag run` streaming UX after the
839/// subprocess-drop refactor.
840///
841/// - `run` → [`AgentBuilder::exec`] (or [`AgentBuilder::run`] for `interactive`).
842/// - `review` (non-codex) → prompt built inline, then `AgentBuilder::exec`.
843/// - `review` (codex) → [`zag_agent::review::run_review`] (no live stream; codex has a native review flow).
844/// - `plan` → prompt built via [`zag_agent::plan::build_plan_prompt`], then `AgentBuilder::exec`, result optionally written to a file.
845/// - `pipe` → context built inline via [`zag_orch::collect::extract_last_assistant_message`], then `AgentBuilder::exec`.
846/// - `collect` → [`zag_orch::collect::collect_results`] (serialized as JSON; no agent).
847/// - `summary` → [`zag_orch::summary::summarize_sessions`] (serialized as JSON; no agent).
848async fn dispatch_agent(
849    cfg: &AgentConfig,
850    step_name: &str,
851    session: Option<&Arc<SessionWriter>>,
852    prefix: Option<&str>,
853) -> Result<String, ZigError> {
854    match cfg.command.as_str() {
855        "run" => {
856            if cfg.interactive {
857                // Interactive sessions inherit stdio — the provider TUI
858                // takes over the terminal and renders events directly, so
859                // no log-event hook is wired here.
860                let builder = apply_agent_config(AgentBuilder::new(), cfg);
861                builder.run(Some(&cfg.prompt)).await.map_err(|e| {
862                    ZigError::Zag(format!("agent run failed for step '{step_name}': {e}"))
863                })?;
864                Ok(String::new())
865            } else {
866                let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
867                builder = install_live_streaming(builder, step_name, session, prefix);
868                let output = builder.exec(&cfg.prompt).await.map_err(|e| {
869                    ZigError::Zag(format!("agent exec failed for step '{step_name}': {e}"))
870                })?;
871                Ok(output.result.unwrap_or_default())
872            }
873        }
874        "review" => {
875            let provider = cfg.provider.clone().unwrap_or_else(|| "claude".to_string());
876            let (uncommitted, base, commit, title) = match &cfg.command_params {
877                Some(CommandParams::Review {
878                    uncommitted,
879                    base,
880                    commit,
881                    title,
882                }) => (*uncommitted, base.clone(), commit.clone(), title.clone()),
883                _ => (false, None, None, None),
884            };
885
886            // Codex has a native review flow inside zag-agent that we
887            // don't want to reimplement — fall back to the library call.
888            // No live stream in that branch; same as before this fix.
889            if provider == "codex" {
890                let params = agent_review::ReviewParams {
891                    provider,
892                    uncommitted,
893                    base,
894                    commit,
895                    title,
896                    prompt: if cfg.prompt.is_empty() {
897                        None
898                    } else {
899                        Some(cfg.prompt.clone())
900                    },
901                    system_prompt: cfg.system_prompt.clone(),
902                    model: cfg.model.clone(),
903                    root: cfg.root.clone(),
904                    auto_approve: cfg.auto_approve,
905                    add_dirs: cfg.add_dirs.clone(),
906                    progress: Box::new(zag_agent::progress::SilentProgress),
907                };
908                let output = agent_review::run_review(params).await.map_err(|e| {
909                    ZigError::Zag(format!("review failed for step '{step_name}': {e}"))
910                })?;
911                return Ok(output.and_then(|o| o.result).unwrap_or_default());
912            }
913
914            // Non-codex review: build the diff and prompt in-process, then
915            // drive AgentBuilder directly so we can attach the live hook.
916            let diff = agent_review::gather_diff(
917                uncommitted,
918                base.as_deref(),
919                commit.as_deref(),
920                cfg.root.as_deref(),
921            )
922            .map_err(|e| {
923                ZigError::Zag(format!(
924                    "review gather_diff failed for step '{step_name}': {e}"
925                ))
926            })?;
927            let user_prompt = if cfg.prompt.is_empty() {
928                None
929            } else {
930                Some(cfg.prompt.as_str())
931            };
932            let review_prompt =
933                agent_review::build_review_prompt(&diff, title.as_deref(), user_prompt);
934
935            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
936            builder = install_live_streaming(builder, step_name, session, prefix);
937            let output = builder.exec(&review_prompt).await.map_err(|e| {
938                ZigError::Zag(format!("review exec failed for step '{step_name}': {e}"))
939            })?;
940            Ok(output.result.unwrap_or_default())
941        }
942        "plan" => {
943            let (plan_output_path, instructions) = match &cfg.command_params {
944                Some(CommandParams::Plan {
945                    output,
946                    instructions,
947                }) => (output.clone(), instructions.clone()),
948                _ => (None, None),
949            };
950
951            let plan_prompt = agent_plan::build_plan_prompt(&cfg.prompt, instructions.as_deref());
952
953            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
954            builder = install_live_streaming(builder, step_name, session, prefix);
955            let output = builder.exec(&plan_prompt).await.map_err(|e| {
956                ZigError::Zag(format!("plan exec failed for step '{step_name}': {e}"))
957            })?;
958            let text = output.result.unwrap_or_default();
959
960            if let Some(path_str) = plan_output_path {
961                let target = resolve_plan_output_path(&path_str);
962                if let Some(parent) = target.parent()
963                    && !parent.as_os_str().is_empty()
964                {
965                    std::fs::create_dir_all(parent).map_err(|e| {
966                        ZigError::Io(format!(
967                            "failed to create plan output directory {}: {e}",
968                            parent.display()
969                        ))
970                    })?;
971                }
972                std::fs::write(&target, &text).map_err(|e| {
973                    ZigError::Io(format!(
974                        "failed to write plan output to {}: {e}",
975                        target.display()
976                    ))
977                })?;
978                eprintln!("plan written to {}", target.display());
979            }
980
981            Ok(text)
982        }
983        "pipe" => {
984            let session_ids = match &cfg.command_params {
985                Some(CommandParams::Pipe { session_ids }) => session_ids.as_slice(),
986                _ => &[] as &[String],
987            };
988            let context = build_pipe_context(session_ids, cfg.root.as_deref())?;
989            let combined = format!(
990                "Here are results from previous agent sessions:\n\n{context}\n\n{}",
991                cfg.prompt
992            );
993
994            let mut builder = apply_agent_config(AgentBuilder::new(), cfg);
995            builder = install_live_streaming(builder, step_name, session, prefix);
996            let output = builder.exec(&combined).await.map_err(|e| {
997                ZigError::Zag(format!("pipe exec failed for step '{step_name}': {e}"))
998            })?;
999            Ok(output.result.unwrap_or_default())
1000        }
1001        "collect" => {
1002            let session_ids = match &cfg.command_params {
1003                Some(CommandParams::Collect { session_ids }) => session_ids.clone(),
1004                _ => Vec::new(),
1005            };
1006            let params = orch_collect::CollectParams {
1007                session_ids,
1008                tag: None,
1009                json: true,
1010                root: cfg.root.clone(),
1011            };
1012            let results = orch_collect::collect_results(&params).map_err(|e| {
1013                ZigError::Zag(format!("collect failed for step '{step_name}': {e}"))
1014            })?;
1015            let json = serde_json::to_string(&results)
1016                .map_err(|e| ZigError::Execution(format!("collect serialization failed: {e}")))?;
1017            emit_captured(&json, step_name, session, prefix);
1018            Ok(json)
1019        }
1020        "summary" => {
1021            let session_ids = match &cfg.command_params {
1022                Some(CommandParams::Summary { session_ids }) => session_ids.clone(),
1023                _ => Vec::new(),
1024            };
1025            let params = orch_summary::SummaryParams {
1026                session_ids,
1027                tag: None,
1028                stats: false,
1029                json: true,
1030                root: cfg.root.clone(),
1031            };
1032            let results = orch_summary::summarize_sessions(&params).map_err(|e| {
1033                ZigError::Zag(format!("summary failed for step '{step_name}': {e}"))
1034            })?;
1035            let json = serde_json::to_string(&results)
1036                .map_err(|e| ZigError::Execution(format!("summary serialization failed: {e}")))?;
1037            emit_captured(&json, step_name, session, prefix);
1038            Ok(json)
1039        }
1040        other => Err(ZigError::Execution(format!(
1041            "unknown command '{other}' for step '{step_name}'"
1042        ))),
1043    }
1044}
1045
1046/// Attach a live event stream to `builder`. Every [`AgentLogEvent`] that
1047/// reaches the session log fires the closure: rendered via
1048/// [`zag_agent::listen::format_event_text`] and routed to both stderr
1049/// (with optional `[prefix]` tagging for parallel tiers) and the zig
1050/// [`SessionWriter`] as `StepOutput` events — the same surface the old
1051/// `run_zag_streaming` helper produced before we dropped the subprocess.
1052fn install_live_streaming(
1053    builder: AgentBuilder,
1054    step_name: &str,
1055    session: Option<&Arc<SessionWriter>>,
1056    prefix: Option<&str>,
1057) -> AgentBuilder {
1058    let step_name_owned = step_name.to_string();
1059    let prefix_owned = prefix.map(String::from);
1060    let session_owned = session.cloned();
1061    let last_activity = Arc::new(std::sync::Mutex::new(Instant::now()));
1062    builder.on_log_event(move |evt| {
1063        if matches!(evt.kind, LogEventKind::Heartbeat { .. }) {
1064            let elapsed = last_activity.lock().unwrap().elapsed().as_secs();
1065            emit_heartbeat_line(elapsed, prefix_owned.as_deref());
1066            return;
1067        }
1068        *last_activity.lock().unwrap() = Instant::now();
1069        let Some(text) = zag_agent::listen::format_event_text(evt, false) else {
1070            return;
1071        };
1072        emit_live_line(
1073            &text,
1074            &step_name_owned,
1075            session_owned.as_ref(),
1076            prefix_owned.as_deref(),
1077        );
1078    })
1079}
1080
1081/// Write one or more rendered lines to stderr (with optional prefix) and
1082/// mirror each line to the zig session writer.
1083fn emit_live_line(
1084    text: &str,
1085    step_name: &str,
1086    session: Option<&Arc<SessionWriter>>,
1087    prefix: Option<&str>,
1088) {
1089    use std::io::Write;
1090    if text.is_empty() {
1091        return;
1092    }
1093    let stderr = std::io::stderr();
1094    for line in text.lines() {
1095        if let Some(w) = session {
1096            let _ = w.step_output(step_name, OutputStream::Stdout, line);
1097        }
1098        let mut h = stderr.lock();
1099        let _ = match prefix {
1100            Some(p) => writeln!(h, "[{p}] {line}"),
1101            None => writeln!(h, "{line}"),
1102        };
1103    }
1104}
1105
1106/// Emit a heartbeat pulse to stderr so users can see the step is still
1107/// alive during silent stretches (e.g., the model is reasoning before
1108/// its first tool call). Skips the session writer to keep the persisted
1109/// event log focused on real activity.
1110fn emit_heartbeat_line(elapsed_secs: u64, prefix: Option<&str>) {
1111    use std::io::Write;
1112    let line = format!("  \u{00b7} waiting ({elapsed_secs}s)");
1113    let stderr = std::io::stderr();
1114    let mut h = stderr.lock();
1115    let _ = match prefix {
1116        Some(p) => writeln!(h, "[{p}] {line}"),
1117        None => writeln!(h, "{line}"),
1118    };
1119}
1120
1121/// Emit captured non-agent output (collect / summary JSON) line-by-line
1122/// to stderr and the session writer. Kept for the two paths that don't
1123/// run an agent and therefore can't use [`install_live_streaming`].
1124fn emit_captured(
1125    text: &str,
1126    step_name: &str,
1127    session: Option<&Arc<SessionWriter>>,
1128    prefix: Option<&str>,
1129) {
1130    emit_live_line(text, step_name, session, prefix);
1131}
1132
1133/// Build the `<session-result>` context block from upstream session IDs.
1134///
1135/// Mirrors `zag_orch::pipe::build_context` (`zag-orch/src/pipe.rs:86-118`)
1136/// byte-for-byte so the combined prompt zig feeds the agent matches what
1137/// the `zag pipe` CLI would have produced.
1138fn build_pipe_context(session_ids: &[String], root: Option<&str>) -> Result<String, ZigError> {
1139    let mut parts = Vec::new();
1140    for (i, id) in session_ids.iter().enumerate() {
1141        let Some(text) = orch_collect::extract_last_assistant_message(id, root) else {
1142            eprintln!("warning: no result found for upstream session {id}");
1143            continue;
1144        };
1145        let short = &id[..id.len().min(8)];
1146        let block = if session_ids.len() == 1 {
1147            format!("<session-result session=\"{short}\">\n{text}\n</session-result>")
1148        } else {
1149            format!(
1150                "<session-result index=\"{}\" session=\"{short}\">\n{text}\n</session-result>",
1151                i + 1
1152            )
1153        };
1154        parts.push(block);
1155    }
1156
1157    if parts.is_empty() {
1158        return Err(ZigError::Execution(
1159            "pipe: no results available from the specified sessions".into(),
1160        ));
1161    }
1162    Ok(parts.join("\n\n"))
1163}
1164
1165/// Resolve a `plan_output` path. If the caller specified a bare directory
1166/// name (no extension), append a timestamped `plan-YYYYMMDD-HHMMSS.md`
1167/// inside it — matching the behavior documented on
1168/// [`zag_agent::plan::PlanParams::output`].
1169fn resolve_plan_output_path(path_str: &str) -> std::path::PathBuf {
1170    let expanded = expand_path(path_str);
1171    let path = std::path::PathBuf::from(&expanded);
1172    if path.extension().is_some() {
1173        return path;
1174    }
1175    let stamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
1176    path.join(format!("plan-{stamp}.md"))
1177}
1178
1179/// Execute a single step through the agent-builder dispatch. Returns the
1180/// captured result text used for `saves` and dependency injection. The
1181/// optional `model_override` is used during retries to escalate to a
1182/// different model.
1183#[allow(clippy::too_many_arguments)]
1184async fn execute_step(
1185    step: &Step,
1186    prompt: &str,
1187    workflow_name: &str,
1188    model_override: Option<&str>,
1189    prefix: Option<&str>,
1190    session: Option<&Arc<SessionWriter>>,
1191    rendered_system_prompt: Option<&str>,
1192    workflow_provider: Option<&str>,
1193    workflow_model: Option<&str>,
1194    extra_add_dirs: &[std::path::PathBuf],
1195) -> Result<String, ZigError> {
1196    let cfg = build_agent_config(
1197        step,
1198        prompt,
1199        workflow_name,
1200        model_override,
1201        rendered_system_prompt,
1202        workflow_provider,
1203        workflow_model,
1204        extra_add_dirs,
1205    );
1206    dispatch_agent(&cfg, &step.name, session, prefix).await
1207}
1208
1209/// Run a step with retry logic, returning its captured output on success.
1210///
1211/// Extracted so both sequential and parallel execution paths share the
1212/// same retry / model-escalation behavior.
1213#[allow(clippy::too_many_arguments)]
1214async fn run_step_attempts(
1215    step: &Step,
1216    prompt: &str,
1217    workflow_name: &str,
1218    prefix: Option<&str>,
1219    session: Option<&Arc<SessionWriter>>,
1220    rendered_system_prompt: Option<&str>,
1221    workflow_provider: Option<&str>,
1222    workflow_model: Option<&str>,
1223    extra_add_dirs: &[std::path::PathBuf],
1224) -> Result<String, ZigError> {
1225    let mut attempts = 0;
1226    let max_attempts = if step.on_failure.as_ref() == Some(&FailurePolicy::Retry) {
1227        step.max_retries.unwrap_or(1) + 1
1228    } else {
1229        1
1230    };
1231
1232    loop {
1233        attempts += 1;
1234        let model_override = if attempts > 1 {
1235            step.retry_model.as_deref()
1236        } else {
1237            None
1238        };
1239        match execute_step(
1240            step,
1241            prompt,
1242            workflow_name,
1243            model_override,
1244            prefix,
1245            session,
1246            rendered_system_prompt,
1247            workflow_provider,
1248            workflow_model,
1249            extra_add_dirs,
1250        )
1251        .await
1252        {
1253            Ok(output) => return Ok(output),
1254            Err(e) => {
1255                if let Some(w) = session {
1256                    let _ = w.step_failed(&step.name, None, attempts, &e.to_string());
1257                }
1258                if attempts < max_attempts {
1259                    eprintln!(
1260                        "    retry {}/{} for step '{}'",
1261                        attempts,
1262                        max_attempts - 1,
1263                        step.name
1264                    );
1265                    continue;
1266                }
1267                return Err(e);
1268            }
1269        }
1270    }
1271}
1272
1273/// Extract variable values from step output using `saves` selectors.
1274///
1275/// Selectors:
1276/// - `"$"` — the entire output
1277/// - `"$.field"` — a top-level JSON field
1278/// - `"$.nested.field"` — a nested JSON field
1279fn extract_saves(
1280    output: &str,
1281    saves: &HashMap<String, String>,
1282) -> Result<HashMap<String, String>, ZigError> {
1283    let mut extracted = HashMap::new();
1284
1285    for (var_name, selector) in saves {
1286        let value = if selector == "$" {
1287            output.trim().to_string()
1288        } else if let Some(path) = selector.strip_prefix("$.") {
1289            let json: serde_json::Value = serde_json::from_str(output.trim()).map_err(|e| {
1290                ZigError::Execution(format!(
1291                    "saves selector '{selector}' requires JSON output, but got parse error: {e}"
1292                ))
1293            })?;
1294            json_path_lookup(&json, path)
1295        } else {
1296            output.trim().to_string()
1297        };
1298
1299        extracted.insert(var_name.clone(), value);
1300    }
1301
1302    Ok(extracted)
1303}
1304
1305/// Partition a tier of steps into sequential steps and race groups.
1306///
1307/// Steps without a `race_group` are returned as sequential. Steps sharing
1308/// the same `race_group` value are grouped together for parallel execution.
1309fn partition_tier<'a>(tier: &[&'a Step]) -> (Vec<&'a Step>, HashMap<String, Vec<&'a Step>>) {
1310    let mut sequential = Vec::new();
1311    let mut race_groups: HashMap<String, Vec<&'a Step>> = HashMap::new();
1312
1313    for step in tier {
1314        if let Some(group) = &step.race_group {
1315            race_groups.entry(group.clone()).or_default().push(step);
1316        } else {
1317            sequential.push(*step);
1318        }
1319    }
1320
1321    (sequential, race_groups)
1322}
1323
1324/// Execute a race group: run all steps concurrently via a [`JoinSet`] and
1325/// return the first winner. Once one step succeeds, the remaining tasks
1326/// are aborted (the underlying `tokio::process::Child` is dropped, which
1327/// kills the provider subprocess if `kill_on_drop` is set — zag-agent
1328/// sets this on its internal commands).
1329#[allow(clippy::too_many_arguments)]
1330async fn execute_race_group(
1331    steps: &[&Step],
1332    prompts: &HashMap<String, String>,
1333    system_prompts: &HashMap<String, String>,
1334    workflow_name: &str,
1335    tier_index: usize,
1336    session: Option<&Arc<SessionWriter>>,
1337    workflow_provider: Option<&str>,
1338    workflow_model: Option<&str>,
1339    storage_dirs: &HashMap<String, Vec<std::path::PathBuf>>,
1340) -> Result<(String, String), ZigError> {
1341    if let Some(w) = session {
1342        for step in steps {
1343            let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1344            let preview = prompts
1345                .get(&step.name)
1346                .map(|p| prompt_preview(p))
1347                .unwrap_or_default();
1348            let _ = w.step_started(
1349                &step.name,
1350                tier_index,
1351                &zag_session_id,
1352                zag_command_name(&step.command),
1353                step.model.as_deref(),
1354                &preview,
1355            );
1356        }
1357    }
1358
1359    let race_started = Instant::now();
1360    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1361
1362    for step in steps {
1363        let prompt = prompts
1364            .get(&step.name)
1365            .ok_or_else(|| ZigError::Execution(format!("missing prompt for step '{}'", step.name)))?
1366            .clone();
1367        eprintln!("  racing step '{}'...", step.name);
1368        let rendered_sp = system_prompts.get(&step.name).cloned();
1369        let empty: Vec<std::path::PathBuf> = Vec::new();
1370        let extra_dirs = storage_dirs.get(&step.name).unwrap_or(&empty).clone();
1371        let step_clone: Step = (*step).clone();
1372        let wf_name = workflow_name.to_string();
1373        let wf_provider = workflow_provider.map(String::from);
1374        let wf_model = workflow_model.map(String::from);
1375        let session_clone = session.cloned();
1376        let name = step.name.clone();
1377        set.spawn(async move {
1378            let res = execute_step(
1379                &step_clone,
1380                &prompt,
1381                &wf_name,
1382                None,
1383                None,
1384                session_clone.as_ref(),
1385                rendered_sp.as_deref(),
1386                wf_provider.as_deref(),
1387                wf_model.as_deref(),
1388                &extra_dirs,
1389            )
1390            .await;
1391            (name, res)
1392        });
1393    }
1394
1395    // Wait for the first winner — drop (abort) the rest.
1396    while let Some(joined) = set.join_next().await {
1397        let (winner_name, result) = match joined {
1398            Ok(pair) => pair,
1399            Err(e) if e.is_cancelled() => continue,
1400            Err(e) => return Err(ZigError::Execution(format!("race task panicked: {e}"))),
1401        };
1402        match result {
1403            Ok(stdout) => {
1404                // Abort losers.
1405                set.abort_all();
1406                while let Some(r) = set.join_next().await {
1407                    if let Ok((name, _)) = r {
1408                        eprintln!("  cancelling step '{name}' (race lost)");
1409                    }
1410                }
1411                let elapsed = race_started.elapsed().as_millis() as u64;
1412                eprintln!("  race won by '{winner_name}'");
1413                if let Some(w) = session {
1414                    let _ = w.step_completed(&winner_name, 0, elapsed, Vec::new());
1415                }
1416                return Ok((winner_name, stdout));
1417            }
1418            Err(e) => {
1419                if let Some(w) = session {
1420                    let _ = w.step_failed(&winner_name, None, 1, &e.to_string());
1421                }
1422                // Keep racing remaining tasks; this one lost.
1423                continue;
1424            }
1425        }
1426    }
1427
1428    Err(ZigError::Execution(
1429        "all racers failed without a winner".into(),
1430    ))
1431}
1432
1433/// Execute a single sequential step with retry logic, saves, and next-jump handling.
1434#[allow(clippy::too_many_arguments)]
1435async fn execute_sequential_step(
1436    step: &Step,
1437    vars: &mut HashMap<String, String>,
1438    user_prompt: Option<&str>,
1439    step_outputs: &mut HashMap<String, String>,
1440    workflow_name: &str,
1441    pending_next: &mut Option<String>,
1442    tier_index: usize,
1443    session: Option<&Arc<SessionWriter>>,
1444    roles: &HashMap<String, Role>,
1445    resources: &ResourceCollector<'_>,
1446    memory: &MemoryCollector,
1447    storage: &StorageManager,
1448    workflow_dir: &Path,
1449    workflow_provider: Option<&str>,
1450    workflow_model: Option<&str>,
1451) -> Result<(), ZigError> {
1452    if let Some(condition) = &step.condition {
1453        if !evaluate_condition(condition, vars)? {
1454            eprintln!(
1455                "  skipping '{}' (condition not met: {condition})",
1456                step.name
1457            );
1458            if let Some(w) = session {
1459                let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1460            }
1461            return Ok(());
1462        }
1463    }
1464
1465    eprintln!("  running step '{}'...", step.name);
1466
1467    let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1468    let rendered_sp = resolve_role_system_prompt(
1469        step,
1470        roles,
1471        resources,
1472        memory,
1473        storage,
1474        vars,
1475        workflow_dir,
1476        workflow_name,
1477    )?;
1478    let storage_dirs = storage.add_dirs_for_step(step.storage.as_deref());
1479    if let Some(w) = session {
1480        let zag_session_id = format!("zig-{workflow_name}-{}", step.name);
1481        let _ = w.step_started(
1482            &step.name,
1483            tier_index,
1484            &zag_session_id,
1485            zag_command_name(&step.command),
1486            step.model.as_deref(),
1487            &prompt_preview(&prompt),
1488        );
1489    }
1490    let started = Instant::now();
1491    let result = run_step_attempts(
1492        step,
1493        &prompt,
1494        workflow_name,
1495        None,
1496        session,
1497        rendered_sp.as_deref(),
1498        workflow_provider,
1499        workflow_model,
1500        &storage_dirs,
1501    )
1502    .await;
1503
1504    match result {
1505        Ok(output) => {
1506            let mut saved_keys: Vec<String> = Vec::new();
1507            if !step.saves.is_empty() {
1508                let saved = extract_saves(&output, &step.saves)?;
1509                for (k, v) in &saved {
1510                    eprintln!("    saved {k} = {v}");
1511                    saved_keys.push(k.clone());
1512                }
1513                vars.extend(saved);
1514            }
1515
1516            step_outputs.insert(step.name.clone(), output);
1517            eprintln!("  completed '{}'", step.name);
1518            if let Some(w) = session {
1519                let _ = w.step_completed(
1520                    &step.name,
1521                    0,
1522                    started.elapsed().as_millis() as u64,
1523                    saved_keys,
1524                );
1525            }
1526
1527            if step.next.is_some() {
1528                *pending_next = step.next.clone();
1529            }
1530        }
1531        Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1532            FailurePolicy::Fail => return Err(e),
1533            FailurePolicy::Continue => {
1534                eprintln!("  step '{}' failed (continuing): {e}", step.name);
1535            }
1536            FailurePolicy::Retry => {
1537                return Err(e);
1538            }
1539        },
1540    }
1541
1542    Ok(())
1543}
1544
1545/// Run multiple independent steps in a tier concurrently.
1546///
1547/// All non-skipped steps are spawned as tokio tasks via [`JoinSet`] and
1548/// we wait for every one to finish (unlike race groups, which abort
1549/// losers). Captured output lines are written back to stderr after each
1550/// task completes, prefixed with the step name to disambiguate. Results
1551/// are processed in tier-declaration order so `saves`, `next`, and
1552/// `on_failure` semantics remain deterministic.
1553#[allow(clippy::too_many_arguments)]
1554async fn execute_parallel_tier(
1555    steps: &[&Step],
1556    vars: &mut HashMap<String, String>,
1557    user_prompt: Option<&str>,
1558    step_outputs: &mut HashMap<String, String>,
1559    workflow_name: &str,
1560    pending_next: &mut Option<String>,
1561    tier_index: usize,
1562    session: Option<&Arc<SessionWriter>>,
1563    roles: &HashMap<String, Role>,
1564    resources: &ResourceCollector<'_>,
1565    memory: &MemoryCollector,
1566    storage: &StorageManager,
1567    workflow_dir: &Path,
1568    workflow_provider: Option<&str>,
1569    workflow_model: Option<&str>,
1570) -> Result<(), ZigError> {
1571    // Evaluate conditions and render prompts up front, so threads receive
1572    // the same variable snapshot they would have under sequential execution.
1573    let mut active: Vec<&Step> = Vec::new();
1574    let mut prompts: HashMap<String, String> = HashMap::new();
1575    let mut rendered_sps: HashMap<String, String> = HashMap::new();
1576    let mut storage_dirs_map: HashMap<String, Vec<std::path::PathBuf>> = HashMap::new();
1577    for step in steps {
1578        if let Some(condition) = &step.condition {
1579            if !evaluate_condition(condition, vars)? {
1580                eprintln!(
1581                    "  skipping '{}' (condition not met: {condition})",
1582                    step.name
1583                );
1584                if let Some(w) = session {
1585                    let _ = w.step_skipped(&step.name, &format!("condition not met: {condition}"));
1586                }
1587                continue;
1588            }
1589        }
1590        let prompt = render_step_prompt(step, vars, user_prompt, step_outputs);
1591        prompts.insert(step.name.clone(), prompt);
1592        if let Some(sp) = resolve_role_system_prompt(
1593            step,
1594            roles,
1595            resources,
1596            memory,
1597            storage,
1598            vars,
1599            workflow_dir,
1600            workflow_name,
1601        )? {
1602            rendered_sps.insert(step.name.clone(), sp);
1603        }
1604        storage_dirs_map.insert(
1605            step.name.clone(),
1606            storage.add_dirs_for_step(step.storage.as_deref()),
1607        );
1608        active.push(*step);
1609    }
1610
1611    if active.is_empty() {
1612        return Ok(());
1613    }
1614
1615    eprintln!("  running {} steps in parallel...", active.len());
1616
1617    let mut start_times: HashMap<String, Instant> = HashMap::new();
1618    let mut set: JoinSet<(String, Result<String, ZigError>)> = JoinSet::new();
1619    for step in &active {
1620        let step_clone: Step = (*step).clone();
1621        let prompt = prompts.remove(&step.name).unwrap_or_default();
1622        let rendered_sp = rendered_sps.remove(&step.name);
1623        let workflow_name_owned = workflow_name.to_string();
1624        let name = step.name.clone();
1625        eprintln!("  starting '{name}'...");
1626        if let Some(w) = session {
1627            let zag_session_id = format!("zig-{workflow_name}-{name}");
1628            let _ = w.step_started(
1629                &name,
1630                tier_index,
1631                &zag_session_id,
1632                zag_command_name(&step.command),
1633                step.model.as_deref(),
1634                &prompt_preview(&prompt),
1635            );
1636        }
1637        start_times.insert(name.clone(), Instant::now());
1638        let session_clone = session.cloned();
1639        let wf_provider = workflow_provider.map(String::from);
1640        let wf_model = workflow_model.map(String::from);
1641        let storage_dirs = storage_dirs_map.remove(&step.name).unwrap_or_default();
1642        set.spawn(async move {
1643            let res = run_step_attempts(
1644                &step_clone,
1645                &prompt,
1646                &workflow_name_owned,
1647                Some(&name),
1648                session_clone.as_ref(),
1649                rendered_sp.as_deref(),
1650                wf_provider.as_deref(),
1651                wf_model.as_deref(),
1652                &storage_dirs,
1653            )
1654            .await;
1655            (name, res)
1656        });
1657    }
1658
1659    let mut results: HashMap<String, Result<String, ZigError>> = HashMap::new();
1660    while let Some(joined) = set.join_next().await {
1661        match joined {
1662            Ok((name, res)) => {
1663                results.insert(name, res);
1664            }
1665            Err(e) => {
1666                return Err(ZigError::Execution(format!(
1667                    "parallel step task panicked: {e}"
1668                )));
1669            }
1670        }
1671    }
1672
1673    // Process results in tier-declaration order so `next` is deterministic.
1674    let mut errors: Vec<String> = Vec::new();
1675    for step in &active {
1676        let Some(res) = results.remove(&step.name) else {
1677            continue;
1678        };
1679        let elapsed = start_times
1680            .remove(&step.name)
1681            .map(|t| t.elapsed().as_millis() as u64)
1682            .unwrap_or(0);
1683        match res {
1684            Ok(output) => {
1685                let mut saved_keys: Vec<String> = Vec::new();
1686                if !step.saves.is_empty() {
1687                    let saved = extract_saves(&output, &step.saves)?;
1688                    for (k, v) in &saved {
1689                        eprintln!("    saved {k} = {v}");
1690                        saved_keys.push(k.clone());
1691                    }
1692                    vars.extend(saved);
1693                }
1694                step_outputs.insert(step.name.clone(), output);
1695                eprintln!("  completed '{}'", step.name);
1696                if let Some(w) = session {
1697                    let _ = w.step_completed(&step.name, 0, elapsed, saved_keys);
1698                }
1699                if step.next.is_some() && pending_next.is_none() {
1700                    *pending_next = step.next.clone();
1701                }
1702            }
1703            Err(e) => match step.on_failure.as_ref().unwrap_or(&FailurePolicy::Fail) {
1704                FailurePolicy::Continue => {
1705                    eprintln!("  step '{}' failed (continuing): {e}", step.name);
1706                }
1707                FailurePolicy::Fail | FailurePolicy::Retry => {
1708                    errors.push(format!("'{}': {e}", step.name));
1709                }
1710            },
1711        }
1712    }
1713
1714    if !errors.is_empty() {
1715        return Err(ZigError::Execution(format!(
1716            "parallel step(s) failed: {}",
1717            errors.join("; ")
1718        )));
1719    }
1720
1721    Ok(())
1722}
1723
1724/// Initialize the variable map from workflow variable definitions.
1725/// Variables with defaults are set to their default value; others are empty.
1726fn init_vars(workflow: &Workflow) -> HashMap<String, String> {
1727    let mut vars = HashMap::new();
1728    for (name, var) in &workflow.vars {
1729        let value = match &var.default {
1730            Some(toml::Value::String(s)) => s.clone(),
1731            Some(toml::Value::Integer(n)) => n.to_string(),
1732            Some(toml::Value::Float(f)) => f.to_string(),
1733            Some(toml::Value::Boolean(b)) => b.to_string(),
1734            Some(other) => other.to_string(),
1735            None => String::new(),
1736        };
1737        vars.insert(name.clone(), value);
1738    }
1739    vars
1740}
1741
1742/// Main execution loop for a validated workflow.
1743#[allow(clippy::too_many_arguments)]
1744async fn execute(
1745    workflow: &Workflow,
1746    workflow_path: &std::path::Path,
1747    user_prompt: Option<&str>,
1748    workflow_dir: &Path,
1749    disable_resources: bool,
1750    disable_memory: bool,
1751    disable_storage: bool,
1752    dry_run: bool,
1753    dry_run_format: DryRunFormat,
1754) -> Result<(), ZigError> {
1755    let mut vars = init_vars(workflow);
1756
1757    let resource_collector = ResourceCollector::from_env(
1758        &workflow.workflow.name,
1759        &workflow.workflow.resources,
1760        workflow_dir,
1761        disable_resources,
1762    );
1763
1764    let config = ZigConfig::load();
1765    let workflow_memory_mode = MemoryMode::from_str_opt(workflow.workflow.memory.as_deref());
1766    let memory_collector = MemoryCollector::from_env(
1767        &workflow.workflow.name,
1768        workflow_memory_mode,
1769        &config,
1770        disable_memory,
1771    );
1772
1773    // Build storage manager for this run. Paths resolve against <cwd>/.zig/;
1774    // absolute paths pass through. `ensure` is called on every declared item
1775    // so step agents can trust the paths exist before they run.
1776    // When `--no-storage` is passed, skip building entirely so storage dirs
1777    // are not created and the `<storage>` block is omitted from prompts.
1778    // When `--dry-run` is passed, build the manager without `ensure` so the
1779    // block still renders with correct paths but nothing touches disk.
1780    let storage_manager = if disable_storage || workflow.storage.is_empty() {
1781        StorageManager::empty()
1782    } else if dry_run {
1783        let backend = FilesystemBackend::from_cwd()?;
1784        StorageManager::build_dry(&workflow.storage, backend)
1785    } else {
1786        let backend = FilesystemBackend::from_cwd()?;
1787        StorageManager::build(&workflow.storage, backend)?
1788    };
1789
1790    // Load file-backed variable defaults before prompt binding.
1791    load_file_defaults(&mut vars, &workflow.vars, workflow_dir)?;
1792
1793    // Bind user prompt to the variable with `from = "prompt"`, if any.
1794    let prompt_var = workflow
1795        .vars
1796        .iter()
1797        .find(|(_, v)| v.from.as_deref() == Some("prompt"))
1798        .map(|(name, _)| name.clone());
1799
1800    if let Some(ref var_name) = prompt_var {
1801        if let Some(prompt) = user_prompt {
1802            vars.insert(var_name.clone(), prompt.to_string());
1803        }
1804    }
1805
1806    // Validate variable values against constraints before executing.
1807    if let Err(errors) = validate::validate_var_values(&vars, &workflow.vars) {
1808        let msgs: Vec<String> = errors.iter().map(|e| e.to_string()).collect();
1809        return Err(ZigError::Validation(msgs.join("; ")));
1810    }
1811
1812    // When prompt is bound to a variable, don't also prepend "User context:".
1813    let effective_user_prompt = if prompt_var.is_some() {
1814        None
1815    } else {
1816        user_prompt
1817    };
1818
1819    let mut step_outputs: HashMap<String, String> = HashMap::new();
1820
1821    let wf_provider = workflow.workflow.provider.as_deref();
1822    let wf_model = workflow.workflow.model.as_deref();
1823
1824    let tiers = topological_sort(&workflow.steps)?;
1825
1826    if dry_run {
1827        let ctx = DryRunContext {
1828            workflow,
1829            workflow_path,
1830            workflow_dir,
1831            vars: &vars,
1832            user_prompt: effective_user_prompt,
1833            roles: &workflow.roles,
1834            resources: &resource_collector,
1835            memory: &memory_collector,
1836            storage: &storage_manager,
1837            wf_provider,
1838            wf_model,
1839            disable_resources,
1840            disable_memory,
1841            disable_storage,
1842        };
1843        return crate::dry_run::print_plan(&ctx, &tiers, dry_run_format);
1844    }
1845
1846    eprintln!(
1847        "running workflow '{}' ({} steps in {} tiers)",
1848        workflow.workflow.name,
1849        workflow.steps.len(),
1850        tiers.len()
1851    );
1852
1853    // Open a zig session log for this run. Failure to open the log is
1854    // non-fatal — `zig run` should still execute even if `~/.zig` is
1855    // unwritable. The session writer is `Option<Arc<...>>` everywhere
1856    // downstream so the writer-less path stays intact for tests.
1857    let coordinator = match SessionWriter::create(
1858        &workflow.workflow.name,
1859        &workflow_path.to_string_lossy(),
1860        user_prompt,
1861        tiers.len(),
1862    ) {
1863        Ok(writer) => {
1864            eprintln!("zig session: {}", writer.session_id());
1865            Some(SessionCoordinator::start(writer))
1866        }
1867        Err(e) => {
1868            eprintln!("warning: failed to open zig session log: {e}");
1869            None
1870        }
1871    };
1872    let session_writer: Option<Arc<SessionWriter>> = coordinator.as_ref().map(|c| c.writer());
1873    let session_ref = session_writer.as_ref();
1874
1875    let mut iteration = 0;
1876    let mut pending_next: Option<String> = None;
1877
1878    loop {
1879        let tiers_to_run = if let Some(ref next_step) = pending_next {
1880            // Re-run from the target step onward
1881            let remaining: Vec<Vec<&Step>> = tiers
1882                .iter()
1883                .map(|tier| {
1884                    tier.iter()
1885                        .filter(|s| s.name == *next_step)
1886                        .copied()
1887                        .collect::<Vec<_>>()
1888                })
1889                .filter(|tier| !tier.is_empty())
1890                .collect();
1891            pending_next = None;
1892            remaining
1893        } else if iteration == 0 {
1894            tiers.clone()
1895        } else {
1896            break;
1897        };
1898
1899        for (tier_index, tier) in tiers_to_run.iter().enumerate() {
1900            let (non_race, race_groups) = partition_tier(tier);
1901
1902            if let Some(w) = session_ref {
1903                let names: Vec<String> = tier.iter().map(|s| s.name.clone()).collect();
1904                let _ = w.tier_started(tier_index, names);
1905            }
1906
1907            // Independent steps in the same tier run concurrently. A single
1908            // step takes the sequential path so its output streams without
1909            // a name prefix; multiple steps go through the parallel path.
1910            if non_race.len() <= 1 {
1911                for step in &non_race {
1912                    execute_sequential_step(
1913                        step,
1914                        &mut vars,
1915                        effective_user_prompt,
1916                        &mut step_outputs,
1917                        &workflow.workflow.name,
1918                        &mut pending_next,
1919                        tier_index,
1920                        session_ref,
1921                        &workflow.roles,
1922                        &resource_collector,
1923                        &memory_collector,
1924                        &storage_manager,
1925                        workflow_dir,
1926                        wf_provider,
1927                        wf_model,
1928                    )
1929                    .await?;
1930                }
1931            } else {
1932                execute_parallel_tier(
1933                    &non_race,
1934                    &mut vars,
1935                    effective_user_prompt,
1936                    &mut step_outputs,
1937                    &workflow.workflow.name,
1938                    &mut pending_next,
1939                    tier_index,
1940                    session_ref,
1941                    &workflow.roles,
1942                    &resource_collector,
1943                    &memory_collector,
1944                    &storage_manager,
1945                    workflow_dir,
1946                    wf_provider,
1947                    wf_model,
1948                )
1949                .await?;
1950            }
1951
1952            // Run race groups in parallel
1953            for (group_name, race_steps) in &race_groups {
1954                eprintln!("  starting race group '{group_name}'...");
1955
1956                // Build prompts for all racers (conditions evaluated here)
1957                let mut prompts = HashMap::new();
1958                let mut race_sps: HashMap<String, String> = HashMap::new();
1959                let mut race_storage_dirs: HashMap<String, Vec<std::path::PathBuf>> =
1960                    HashMap::new();
1961                let mut active_steps: Vec<&Step> = Vec::new();
1962                for step in race_steps {
1963                    if let Some(condition) = &step.condition {
1964                        if !evaluate_condition(condition, &vars)? {
1965                            eprintln!(
1966                                "  skipping '{}' (condition not met: {condition})",
1967                                step.name
1968                            );
1969                            continue;
1970                        }
1971                    }
1972                    let prompt =
1973                        render_step_prompt(step, &vars, effective_user_prompt, &step_outputs);
1974                    prompts.insert(step.name.clone(), prompt);
1975                    if let Some(sp) = resolve_role_system_prompt(
1976                        step,
1977                        &workflow.roles,
1978                        &resource_collector,
1979                        &memory_collector,
1980                        &storage_manager,
1981                        &vars,
1982                        workflow_dir,
1983                        &workflow.workflow.name,
1984                    )? {
1985                        race_sps.insert(step.name.clone(), sp);
1986                    }
1987                    race_storage_dirs.insert(
1988                        step.name.clone(),
1989                        storage_manager.add_dirs_for_step(step.storage.as_deref()),
1990                    );
1991                    active_steps.push(step);
1992                }
1993
1994                if active_steps.is_empty() {
1995                    continue;
1996                }
1997
1998                match execute_race_group(
1999                    &active_steps,
2000                    &prompts,
2001                    &race_sps,
2002                    &workflow.workflow.name,
2003                    tier_index,
2004                    session_ref,
2005                    wf_provider,
2006                    wf_model,
2007                    &race_storage_dirs,
2008                )
2009                .await
2010                {
2011                    Ok((winner_name, output)) => {
2012                        // Find the winning step to process saves/next
2013                        if let Some(winner) = active_steps.iter().find(|s| s.name == winner_name) {
2014                            if !winner.saves.is_empty() {
2015                                let saved = extract_saves(&output, &winner.saves)?;
2016                                for (k, v) in &saved {
2017                                    eprintln!("    saved {k} = {v}");
2018                                }
2019                                vars.extend(saved);
2020                            }
2021                            if winner.next.is_some() {
2022                                pending_next = winner.next.clone();
2023                            }
2024                        }
2025                        step_outputs.insert(winner_name.clone(), output);
2026                        eprintln!(
2027                            "  completed race group '{group_name}' (winner: '{winner_name}')"
2028                        );
2029                    }
2030                    Err(e) => return Err(e),
2031                }
2032            }
2033        }
2034
2035        iteration += 1;
2036        if pending_next.is_none() || iteration >= MAX_LOOP_ITERATIONS {
2037            if iteration >= MAX_LOOP_ITERATIONS {
2038                eprintln!("warning: reached maximum loop iterations ({MAX_LOOP_ITERATIONS})");
2039            }
2040            break;
2041        }
2042    }
2043
2044    eprintln!("workflow '{}' completed", workflow.workflow.name);
2045    if let Some(c) = coordinator {
2046        let _ = c.finish(SessionStatus::Success);
2047    }
2048    Ok(())
2049}
2050
2051/// Short label for the zag subcommand a step will invoke. Used in
2052/// `step_started` events so listeners can distinguish run/review/plan/etc.
2053fn zag_command_name(cmd: &Option<StepCommand>) -> &'static str {
2054    match cmd {
2055        None => "run",
2056        Some(StepCommand::Review) => "review",
2057        Some(StepCommand::Plan) => "plan",
2058        Some(StepCommand::Pipe) => "pipe",
2059        Some(StepCommand::Collect) => "collect",
2060        Some(StepCommand::Summary) => "summary",
2061    }
2062}
2063
2064/// Truncated single-line preview of a rendered prompt for the session log.
2065fn prompt_preview(prompt: &str) -> String {
2066    const MAX: usize = 200;
2067    let collapsed: String = prompt
2068        .chars()
2069        .map(|c| if c == '\n' { ' ' } else { c })
2070        .collect();
2071    if collapsed.chars().count() <= MAX {
2072        collapsed
2073    } else {
2074        let truncated: String = collapsed.chars().take(MAX).collect();
2075        format!("{truncated}…")
2076    }
2077}
2078
2079#[cfg(test)]
2080#[path = "run_tests.rs"]
2081mod tests;